use std::{
collections::HashMap,
ops::{Deref, DerefMut},
str::FromStr,
sync::Arc,
time::Duration,
};
pub use connection::write::WriteMessage;
pub use connection::SupervisorSender;
use error::ConnectionError;
use futures::{future::Fuse, FutureExt};
use luz::JID;
use stanza::client::{
iq::{self, Iq, IqType},
Stanza,
};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
time::timeout,
};
use tracing::{debug, info};
use crate::connection::write::WriteHandle;
use crate::connection::{SupervisorCommand, SupervisorHandle};
mod connection;
pub mod error;
#[derive(Clone)]
pub struct Connected {
// full jid will stay stable across reconnections
jid: JID,
write_handle: WriteHandle,
}
impl Connected {
pub fn jid(&self) -> &JID {
&self.jid
}
pub fn write_handle(&self) -> &WriteHandle {
&self.write_handle
}
}
/// everything that a particular xmpp client must implement
pub trait Logic {
/// the command message type
type Cmd;
/// run after binding to the stream (e.g. for a chat client, )
fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;
/// run before closing the stream (e.g. send unavailable presence in a chat client)
fn handle_disconnect(
self,
connection: Connected,
) -> impl std::future::Future<Output = ()> + Send;
/// run to handle an incoming xmpp stanza
fn handle_stanza(
self,
stanza: Stanza,
connection: Connected,
supervisor: SupervisorSender,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when a connection is currently established
fn handle_online(
self,
command: Self::Cmd,
connection: Connected,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when disconnected
fn handle_offline(
self,
command: Self::Cmd,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// handle connection errors from the core client logic
fn handle_connection_error(
self,
error: ConnectionError,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
// async fn handle_stream_error(self, error) {}
}
/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
pub struct CoreClient<Lgc: Logic> {
jid: JID,
// TODO: use a dyn passwordprovider trait to avoid storing password in memory
password: Arc<String>,
receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
connected: Option<(Connected, SupervisorHandle)>,
// TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
// connected_intention: bool,
/// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
logic: Lgc,
// config: LampConfig,
// TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
tasks: JoinSet<()>,
}
impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
/// create a new actor
pub fn new(
jid: JID,
password: String,
receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
connected: Option<(Connected, SupervisorHandle)>,
connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
logic: Lgc,
) -> Self {
Self {
jid,
password: Arc::new(password),
connected,
receiver,
connection_supervisor_shutdown,
logic,
tasks: JoinSet::new(),
}
}
/// run the actor
pub async fn run(mut self) {
loop {
let msg = tokio::select! {
// this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
// THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
_ = &mut self.connection_supervisor_shutdown => {
self.connected = None;
continue;
}
Some(msg) = self.receiver.recv() => {
msg
},
else => break,
};
match msg {
CoreClientCommand::Connect => {
match self.connected {
Some(_) => {
self.logic
.clone()
.handle_connection_error(ConnectionError::AlreadyConnected)
.await;
}
None => {
let mut jid = self.jid.clone();
let mut domain = jid.domainpart.clone();
// TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
let streams_result =
luz::connect_and_login(&mut jid, &*self.password, &mut domain)
.await;
match streams_result {
Ok(s) => {
debug!("ok stream result");
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
let (writer, supervisor) = SupervisorHandle::new(
s,
shutdown_send,
jid.clone(),
self.password.clone(),
self.logic.clone(),
);
let shutdown_recv = shutdown_recv.fuse();
self.connection_supervisor_shutdown = shutdown_recv;
let connected = Connected {
jid,
write_handle: writer,
};
self.logic.clone().handle_connect(connected.clone()).await;
self.connected = Some((connected, supervisor));
}
Err(e) => {
tracing::error!("error: {}", e);
self.logic
.clone()
.handle_connection_error(ConnectionError::ConnectionFailed(
e.into(),
))
.await;
}
}
}
};
}
CoreClientCommand::Disconnect => match self.connected {
None => {
self.logic
.clone()
.handle_connection_error(ConnectionError::AlreadyDisconnected)
.await;
}
ref mut c => {
if let Some((connected, supervisor_handle)) = c.take() {
let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
} else {
unreachable!()
};
}
},
CoreClientCommand::Command(command) => {
match self.connected.as_ref() {
Some((w, s)) => self
.tasks
.spawn(self.logic.clone().handle_online(command, w.clone())),
None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
};
}
}
}
}
}
// TODO: generate methods for each with a macro
pub enum CoreClientCommand<C> {
// TODO: login invisible xep-0186
/// connect to XMPP chat server. gets roster and publishes initial presence.
Connect,
/// disconnect from XMPP chat server, sending unavailable presence then closing stream.
Disconnect,
/// TODO: generics
Command(C),
}