use std::{
collections::HashMap,
marker::PhantomData,
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
time::Duration,
};
use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use stanza::client::Stanza;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
use tracing::info;
use crate::{Connected, Logic};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
/// read actor
pub struct Read<Lgc> {
stream: BoundJabberReader<Tls>,
disconnecting: bool,
disconnect_timedout: oneshot::Receiver<()>,
// all the threads spawned by the current connection session
tasks: JoinSet<()>,
// for handling incoming stanzas
// jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
// control stuff
control_receiver: mpsc::Receiver<ReadControl>,
on_crash: oneshot::Sender<ReadState>,
}
/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
pub struct ReadState {
pub supervisor_control: SupervisorSender,
// TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
pub tasks: JoinSet<()>,
}
impl<Lgc> Read<Lgc> {
fn new(
stream: BoundJabberReader<Tls>,
tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
control_receiver: mpsc::Receiver<ReadControl>,
on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (_send, recv) = oneshot::channel();
Self {
stream,
disconnecting: false,
disconnect_timedout: recv,
tasks,
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
}
}
}
impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
async fn run(mut self) {
println!("started read thread");
// let stanza = self.stream.read::<Stanza>().await;
// println!("{:?}", stanza);
loop {
tokio::select! {
// if still haven't received the end tag in time, just kill itself
// TODO: is this okay??? what if notification thread dies?
Ok(()) = &mut self.disconnect_timedout => {
info!("disconnect_timedout");
break;
}
Some(msg) = self.control_receiver.recv() => {
match msg {
// when disconnect received,
ReadControl::Disconnect => {
let (send, recv) = oneshot::channel();
self.disconnect_timedout = recv;
self.disconnecting = true;
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(10)).await;
let _ = send.send(());
})
},
ReadControl::Abort(sender) => {
let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
break;
},
};
},
s = self.stream.read::<Stanza>() => {
println!("read stanza");
match s {
Ok(s) => {
self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
},
Err(e) => {
println!("error: {:?}", e);
// TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
// match e {
// peanuts::Error::ReadError(error) => todo!(),
// peanuts::Error::Utf8Error(utf8_error) => todo!(),
// peanuts::Error::ParseError(_) => todo!(),
// peanuts::Error::EntityProcessError(_) => todo!(),
// peanuts::Error::InvalidCharRef(_) => todo!(),
// peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
// peanuts::Error::DuplicateAttribute(_) => todo!(),
// peanuts::Error::UnqualifiedNamespace(_) => todo!(),
// peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
// peanuts::Error::NotInElement(_) => todo!(),
// peanuts::Error::ExtraData(_) => todo!(),
// peanuts::Error::UndeclaredNamespace(_) => todo!(),
// peanuts::Error::IncorrectName(name) => todo!(),
// peanuts::Error::DeserializeError(_) => todo!(),
// peanuts::Error::Deserialize(deserialize_error) => todo!(),
// peanuts::Error::RootElementEnded => todo!(),
// }
// TODO: make sure this only happens when an end tag is received
if self.disconnecting == true {
break;
} else {
let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
}
break;
},
}
},
else => break
}
}
println!("stopping read thread");
self.logic.on_abort().await;
}
}
// what do stanza processes do?
// - update ui
// - access database
// - disconnect proper, reconnect
// - respond to server requests
pub enum ReadControl {
Disconnect,
Abort(oneshot::Sender<ReadState>),
}
pub struct ReadControlHandle {
sender: mpsc::Sender<ReadControl>,
pub(crate) handle: JoinHandle<()>,
}
impl Deref for ReadControlHandle {
type Target = mpsc::Sender<ReadControl>;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
impl DerefMut for ReadControlHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.sender
}
}
impl ReadControlHandle {
pub fn new<Lgc: Clone + Logic + Send + 'static>(
stream: BoundJabberReader<Tls>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
JoinSet::new(),
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
stream: BoundJabberReader<Tls>,
tasks: JoinSet<()>,
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
on_crash: oneshot::Sender<ReadState>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
let actor = Read::new(
stream,
tasks,
connected,
logic,
supervisor_control,
control_receiver,
on_crash,
);
let handle = tokio::spawn(async move { actor.run().await });
Self {
sender: control_sender,
handle,
}
}
}