diff options
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r-- | luz/src/connection/read.rs | 383 |
1 files changed, 57 insertions, 326 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index d510c5d..d310a12 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -22,66 +22,59 @@ use crate::{ error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError}, presence::{Offline, Online, Presence, PresenceType, Show}, roster::Contact, - UpdateMessage, + Connected, LogicState, UpdateMessage, }; -use super::{write::WriteHandle, SupervisorCommand}; +use super::{write::WriteHandle, SupervisorCommand, SupervisorSender}; +/// read actor pub struct Read { - control_receiver: mpsc::Receiver<ReadControl>, stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender<UpdateMessage>, - JoinSet<()>, - mpsc::Sender<SupervisorCommand>, - WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - )>, - db: Db, - update_sender: mpsc::Sender<UpdateMessage>, - supervisor_control: mpsc::Sender<SupervisorCommand>, - write_handle: WriteHandle, - tasks: JoinSet<()>, disconnecting: bool, disconnect_timedout: oneshot::Receiver<()>, - // TODO: use proper stanza ids - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + + // all the threads spawned by the current connection session + tasks: JoinSet<()>, + + // for handling incoming stanzas + // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + + // control stuff + control_receiver: mpsc::Receiver<ReadControl>, + on_crash: oneshot::Sender<ReadState>, +} + +/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue +pub struct ReadState { + pub supervisor_control: SupervisorSender, + // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream + pub tasks: JoinSet<()>, } impl Read { fn new( - control_receiver: mpsc::Receiver<ReadControl>, stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender<UpdateMessage>, - JoinSet<()>, - mpsc::Sender<SupervisorCommand>, - WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - )>, - db: Db, - update_sender: mpsc::Sender<UpdateMessage>, - // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) - supervisor_control: mpsc::Sender<SupervisorCommand>, - write_handle: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + control_receiver: mpsc::Receiver<ReadControl>, + on_crash: oneshot::Sender<ReadState>, ) -> Self { let (_send, recv) = oneshot::channel(); Self { - control_receiver, stream, - on_crash, - db, - update_sender, - supervisor_control, - write_handle, - tasks, disconnecting: false, disconnect_timedout: recv, - pending_iqs, + tasks, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, } } @@ -110,7 +103,7 @@ impl Read { }) }, ReadControl::Abort(sender) => { - let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); + let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); break; }, }; @@ -119,7 +112,7 @@ impl Read { println!("read stanza"); match s { Ok(s) => { - self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone())); + self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone())); }, Err(e) => { println!("error: {:?}", e); @@ -146,8 +139,7 @@ impl Read { if self.disconnecting == true { break; } else { - // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around - let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone())); + let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }); } break; }, @@ -158,7 +150,7 @@ impl Read { } println!("stopping read thread"); // when it aborts, must clear iq map no matter what - let mut iqs = self.pending_iqs.lock().await; + let mut iqs = self.logic.pending.lock().await; for (_id, sender) in iqs.drain() { let _ = sender.send(Err(ReadError::LostConnection)); } @@ -170,249 +162,10 @@ impl Read { // - access database // - disconnect proper, reconnect // - respond to server requests -async fn handle_stanza( - stanza: Stanza, - update_sender: mpsc::Sender<UpdateMessage>, - db: Db, - supervisor_control: mpsc::Sender<SupervisorCommand>, - write_handle: WriteHandle, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, -) { - match stanza { - Stanza::Message(stanza_message) => { - if let Some(mut from) = stanza_message.from { - // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. - let timestamp = stanza_message - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - // TODO: group chat messages - let mut message = Message { - id: stanza_message - .id - // TODO: proper id storage - .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) - .unwrap_or_else(|| Uuid::new_v4()), - from: from.clone(), - timestamp, - body: Body { - // TODO: should this be an option? - body: stanza_message - .body - .map(|body| body.body) - .unwrap_or_default() - .unwrap_or_default(), - }, - }; - // TODO: can this be more efficient? - let result = db - .create_message_with_user_resource_and_chat(message.clone(), from.clone()) - .await; - if let Err(e) = result { - tracing::error!("messagecreate"); - let _ = update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MessageHistory(e.into()), - ))) - .await; - } - message.from = message.from.as_bare(); - from = from.as_bare(); - let _ = update_sender - .send(UpdateMessage::Message { to: from, message }) - .await; - } else { - let _ = update_sender - .send(UpdateMessage::Error(Error::MessageRecv( - MessageRecvError::MissingFrom, - ))) - .await; - } - } - Stanza::Presence(presence) => { - if let Some(from) = presence.from { - match presence.r#type { - Some(r#type) => match r#type { - // error processing a presence from somebody - stanza::client::presence::PresenceType::Error => { - // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. - let _ = update_sender - .send(UpdateMessage::Error(Error::Presence( - // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display - PresenceError::StanzaError( - presence - .errors - .first() - .cloned() - .expect("error MUST have error"), - ), - ))) - .await; - } - // should not happen (error to server) - stanza::client::presence::PresenceType::Probe => { - // TODO: should probably write an error and restart stream - let _ = update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::Unsupported, - ))) - .await; - } - stanza::client::presence::PresenceType::Subscribe => { - // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event - let _ = update_sender - .send(UpdateMessage::SubscriptionRequest(from)) - .await; - } - stanza::client::presence::PresenceType::Unavailable => { - let offline = Offline { - status: presence.status.map(|status| status.status.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Offline(offline), - }, - }) - .await; - } - // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. - stanza::client::presence::PresenceType::Subscribed => {} - stanza::client::presence::PresenceType::Unsubscribe => {} - stanza::client::presence::PresenceType::Unsubscribed => {} - }, - None => { - let online = Online { - show: presence.show.map(|show| match show { - stanza::client::presence::Show::Away => Show::Away, - stanza::client::presence::Show::Chat => Show::Chat, - stanza::client::presence::Show::Dnd => Show::DoNotDisturb, - stanza::client::presence::Show::Xa => Show::ExtendedAway, - }), - status: presence.status.map(|status| status.status.0), - priority: presence.priority.map(|priority| priority.0), - }; - let timestamp = presence - .delay - .map(|delay| delay.stamp) - .unwrap_or_else(|| Utc::now()); - let _ = update_sender - .send(UpdateMessage::Presence { - from, - presence: Presence { - timestamp, - presence: PresenceType::Online(online), - }, - }) - .await; - } - } - } else { - let _ = update_sender - .send(UpdateMessage::Error(Error::Presence( - PresenceError::MissingFrom, - ))) - .await; - } - } - Stanza::Iq(iq) => match iq.r#type { - stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { - let send; - { - send = pending_iqs.lock().await.remove(&iq.id); - } - if let Some(send) = send { - send.send(Ok(Stanza::Iq(iq))); - } else { - let _ = update_sender - .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( - iq.id, - )))) - .await; - } - } - // TODO: send unsupported to server - // TODO: proper errors i am so tired please - stanza::client::iq::IqType::Get => {} - stanza::client::iq::IqType::Set => { - if let Some(query) = iq.query { - match query { - stanza::client::iq::Query::Roster(mut query) => { - // TODO: there should only be one - if let Some(item) = query.items.pop() { - match item.subscription { - Some(stanza::roster::Subscription::Remove) => { - db.delete_contact(item.jid.clone()).await; - update_sender - .send(UpdateMessage::RosterDelete(item.jid)) - .await; - // TODO: send result - } - _ => { - let contact: Contact = item.into(); - if let Err(e) = db.upsert_contact(contact.clone()).await { - let _ = update_sender - .send(UpdateMessage::Error(Error::Roster( - RosterError::Cache(e.into()), - ))) - .await; - } - let _ = update_sender - .send(UpdateMessage::RosterUpdate(contact)) - .await; - // TODO: send result - // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { - // from: , - // id: todo!(), - // to: todo!(), - // r#type: todo!(), - // lang: todo!(), - // query: todo!(), - // errors: todo!(), - // })); - } - } - } - } - // TODO: send unsupported to server - _ => {} - } - } else { - // TODO: send error (unsupported) to server - } - } - }, - Stanza::Error(error) => { - let _ = update_sender - .send(UpdateMessage::Error(Error::Stream(error))) - .await; - // TODO: reconnect - } - Stanza::OtherContent(content) => { - let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content))); - // TODO: send error to write_thread - } - } -} pub enum ReadControl { Disconnect, - Abort( - oneshot::Sender<( - Db, - mpsc::Sender<UpdateMessage>, - JoinSet<()>, - mpsc::Sender<SupervisorCommand>, - WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - )>, - ), + Abort(oneshot::Sender<ReadState>), } pub struct ReadControlHandle { @@ -437,32 +190,21 @@ impl DerefMut for ReadControlHandle { impl ReadControlHandle { pub fn new( stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender<UpdateMessage>, - JoinSet<()>, - mpsc::Sender<SupervisorCommand>, - WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - )>, - db: Db, - sender: mpsc::Sender<UpdateMessage>, - supervisor_control: mpsc::Sender<SupervisorCommand>, - jabber_write: WriteHandle, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender<ReadState>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( - control_receiver, stream, - on_crash, - db, - sender, - supervisor_control, - jabber_write, JoinSet::new(), - pending_iqs, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, ); let handle = tokio::spawn(async move { actor.run().await }); @@ -474,33 +216,22 @@ impl ReadControlHandle { pub fn reconnect( stream: BoundJabberReader<Tls>, - on_crash: oneshot::Sender<( - Db, - mpsc::Sender<UpdateMessage>, - JoinSet<()>, - mpsc::Sender<SupervisorCommand>, - WriteHandle, - Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - )>, - db: Db, - sender: mpsc::Sender<UpdateMessage>, - supervisor_control: mpsc::Sender<SupervisorCommand>, - jabber_write: WriteHandle, tasks: JoinSet<()>, - pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + connected: Connected, + logic: LogicState, + supervisor_control: SupervisorSender, + on_crash: oneshot::Sender<ReadState>, ) -> Self { let (control_sender, control_receiver) = mpsc::channel(20); let actor = Read::new( - control_receiver, stream, - on_crash, - db, - sender, - supervisor_control, - jabber_write, tasks, - pending_iqs, + connected, + logic, + supervisor_control, + control_receiver, + on_crash, ); let handle = tokio::spawn(async move { actor.run().await }); |