aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs200
1 files changed, 197 insertions, 3 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 8f8c4a0..46f1dc9 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
+ str::FromStr,
sync::Arc,
time::Duration,
};
@@ -12,10 +13,14 @@ use tokio::{
task::{JoinHandle, JoinSet},
};
use tracing::info;
+use uuid::Uuid;
use crate::{
+ chat::{Body, Message},
db::Db,
- error::{Error, Reason},
+ error::{Error, IqError, PresenceError, Reason, RecvMessageError},
+ presence::{Offline, Online, Presence, Show},
+ roster::Contact,
UpdateMessage,
};
@@ -116,7 +121,7 @@ impl Read {
println!("read stanza");
match s {
Ok(s) => {
- self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
+ self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone()));
},
Err(e) => {
println!("error: {:?}", e);
@@ -173,8 +178,197 @@ async fn handle_stanza(
db: Db,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) {
- println!("{:?}", stanza)
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(from) = stanza_message.from {
+ // TODO: group chat messages
+ let message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = db
+ .create_message_with_user_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
+ .await;
+ }
+ let _ = update_sender
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::RecvMessage(
+ RecvMessageError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Presence(PresenceError::Error(
+ Reason::Stanza(presence.errors.first().cloned()),
+ ))))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = update_sender
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let _ = update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence::Offline(offline),
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let _ = update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence::Online(online),
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = pending_iqs.lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ db.delete_contact(item.jid.clone()).await;
+ update_sender
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) = db.upsert_contact(contact.clone()).await {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::CacheUpdate(
+ e.into(),
+ )))
+ .await;
+ }
+ let _ = update_sender
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
+ }
+ Stanza::OtherContent(content) => {
+ let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
+ // TODO: send error to write_thread
+ }
+ }
}
pub enum ReadControl {