aboutsummaryrefslogtreecommitdiffstats
path: root/luz/src/connection/read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'luz/src/connection/read.rs')
-rw-r--r--luz/src/connection/read.rs383
1 files changed, 57 insertions, 326 deletions
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index d510c5d..d310a12 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -22,66 +22,59 @@ use crate::{
error::{Error, IqError, MessageRecvError, PresenceError, ReadError, RosterError},
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
- UpdateMessage,
+ Connected, LogicState, UpdateMessage,
};
-use super::{write::WriteHandle, SupervisorCommand};
+use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
+/// read actor
pub struct Read {
- control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- update_sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_handle: WriteHandle,
- tasks: JoinSet<()>,
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
- // TODO: use proper stanza ids
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+
+ // all the threads spawned by the current connection session
+ tasks: JoinSet<()>,
+
+ // for handling incoming stanzas
+ // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+
+ // control stuff
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
+}
+
+/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
+pub struct ReadState {
+ pub supervisor_control: SupervisorSender,
+ // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
+ pub tasks: JoinSet<()>,
}
impl Read {
fn new(
- control_receiver: mpsc::Receiver<ReadControl>,
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- update_sender: mpsc::Sender<UpdateMessage>,
- // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_handle: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+ control_receiver: mpsc::Receiver<ReadControl>,
+ on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (_send, recv) = oneshot::channel();
Self {
- control_receiver,
stream,
- on_crash,
- db,
- update_sender,
- supervisor_control,
- write_handle,
- tasks,
disconnecting: false,
disconnect_timedout: recv,
- pending_iqs,
+ tasks,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
}
}
@@ -110,7 +103,7 @@ impl Read {
})
},
ReadControl::Abort(sender) => {
- let _ = sender.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
+ let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
break;
},
};
@@ -119,7 +112,7 @@ impl Read {
println!("read stanza");
match s {
Ok(s) => {
- self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone()));
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
},
Err(e) => {
println!("error: {:?}", e);
@@ -146,8 +139,7 @@ impl Read {
if self.disconnecting == true {
break;
} else {
- // AAAAAAAAAAAAAAAAAAAAA i should really just have this stored in the supervisor and not gaf bout passing these references around
- let _ = self.on_crash.send((self.db, self.update_sender, self.tasks, self.supervisor_control, self.write_handle, self.pending_iqs.clone()));
+ let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
}
break;
},
@@ -158,7 +150,7 @@ impl Read {
}
println!("stopping read thread");
// when it aborts, must clear iq map no matter what
- let mut iqs = self.pending_iqs.lock().await;
+ let mut iqs = self.logic.pending.lock().await;
for (_id, sender) in iqs.drain() {
let _ = sender.send(Err(ReadError::LostConnection));
}
@@ -170,249 +162,10 @@ impl Read {
// - access database
// - disconnect proper, reconnect
// - respond to server requests
-async fn handle_stanza(
- stanza: Stanza,
- update_sender: mpsc::Sender<UpdateMessage>,
- db: Db,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- write_handle: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
-) {
- match stanza {
- Stanza::Message(stanza_message) => {
- if let Some(mut from) = stanza_message.from {
- // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
- let timestamp = stanza_message
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- // TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
- },
- };
- // TODO: can this be more efficient?
- let result = db
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await;
- if let Err(e) = result {
- tracing::error!("messagecreate");
- let _ = update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MessageHistory(e.into()),
- )))
- .await;
- }
- message.from = message.from.as_bare();
- from = from.as_bare();
- let _ = update_sender
- .send(UpdateMessage::Message { to: from, message })
- .await;
- } else {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Presence(presence) => {
- if let Some(from) = presence.from {
- match presence.r#type {
- Some(r#type) => match r#type {
- // error processing a presence from somebody
- stanza::client::presence::PresenceType::Error => {
- // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(
- // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ),
- )))
- .await;
- }
- // should not happen (error to server)
- stanza::client::presence::PresenceType::Probe => {
- // TODO: should probably write an error and restart stream
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::Unsupported,
- )))
- .await;
- }
- stanza::client::presence::PresenceType::Subscribe => {
- // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
- let _ = update_sender
- .send(UpdateMessage::SubscriptionRequest(from))
- .await;
- }
- stanza::client::presence::PresenceType::Unavailable => {
- let offline = Offline {
- status: presence.status.map(|status| status.status.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Offline(offline),
- },
- })
- .await;
- }
- // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
- stanza::client::presence::PresenceType::Subscribed => {}
- stanza::client::presence::PresenceType::Unsubscribe => {}
- stanza::client::presence::PresenceType::Unsubscribed => {}
- },
- None => {
- let online = Online {
- show: presence.show.map(|show| match show {
- stanza::client::presence::Show::Away => Show::Away,
- stanza::client::presence::Show::Chat => Show::Chat,
- stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
- stanza::client::presence::Show::Xa => Show::ExtendedAway,
- }),
- status: presence.status.map(|status| status.status.0),
- priority: presence.priority.map(|priority| priority.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = update_sender
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Online(online),
- },
- })
- .await;
- }
- }
- } else {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::MissingFrom,
- )))
- .await;
- }
- }
- Stanza::Iq(iq) => match iq.r#type {
- stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = pending_iqs.lock().await.remove(&iq.id);
- }
- if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
- } else {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
- iq.id,
- ))))
- .await;
- }
- }
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => {}
- stanza::client::iq::IqType::Set => {
- if let Some(query) = iq.query {
- match query {
- stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
- if let Some(item) = query.items.pop() {
- match item.subscription {
- Some(stanza::roster::Subscription::Remove) => {
- db.delete_contact(item.jid.clone()).await;
- update_sender
- .send(UpdateMessage::RosterDelete(item.jid))
- .await;
- // TODO: send result
- }
- _ => {
- let contact: Contact = item.into();
- if let Err(e) = db.upsert_contact(contact.clone()).await {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
- }
- let _ = update_sender
- .send(UpdateMessage::RosterUpdate(contact))
- .await;
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
- }
- }
- }
- }
- // TODO: send unsupported to server
- _ => {}
- }
- } else {
- // TODO: send error (unsupported) to server
- }
- }
- },
- Stanza::Error(error) => {
- let _ = update_sender
- .send(UpdateMessage::Error(Error::Stream(error)))
- .await;
- // TODO: reconnect
- }
- Stanza::OtherContent(content) => {
- let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
- // TODO: send error to write_thread
- }
- }
-}
pub enum ReadControl {
Disconnect,
- Abort(
- oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- ),
+ Abort(oneshot::Sender<ReadState>),
}
pub struct ReadControlHandle {
@@ -437,32 +190,21 @@ impl DerefMut for ReadControlHandle {
impl ReadControlHandle {
pub fn new(
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- jabber_write: WriteHandle,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
- control_receiver,
stream,
- on_crash,
- db,
- sender,
- supervisor_control,
- jabber_write,
JoinSet::new(),
- pending_iqs,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
);
let handle = tokio::spawn(async move { actor.run().await });
@@ -474,33 +216,22 @@ impl ReadControlHandle {
pub fn reconnect(
stream: BoundJabberReader<Tls>,
- on_crash: oneshot::Sender<(
- Db,
- mpsc::Sender<UpdateMessage>,
- JoinSet<()>,
- mpsc::Sender<SupervisorCommand>,
- WriteHandle,
- Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- )>,
- db: Db,
- sender: mpsc::Sender<UpdateMessage>,
- supervisor_control: mpsc::Sender<SupervisorCommand>,
- jabber_write: WriteHandle,
tasks: JoinSet<()>,
- pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ connected: Connected,
+ logic: LogicState,
+ supervisor_control: SupervisorSender,
+ on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
- control_receiver,
stream,
- on_crash,
- db,
- sender,
- supervisor_control,
- jabber_write,
tasks,
- pending_iqs,
+ connected,
+ logic,
+ supervisor_control,
+ control_receiver,
+ on_crash,
);
let handle = tokio::spawn(async move { actor.run().await });