use std::{collections::HashMap, sync::Arc};
use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
use tracing::{error, info, warn};
use crate::{
Client, Command, UpdateMessage,
db::Db,
error::{Error, RequestError, ResponseError},
};
mod abort;
mod connect;
mod connection_error;
mod disconnect;
mod local_only;
mod offline;
mod online;
mod process_stanza;
#[derive(Clone)]
pub struct ClientLogic {
client: Client,
db: Db,
pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
}
#[derive(Clone)]
pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>);
impl Pending {
pub fn new() -> Self {
Self(Arc::new(Mutex::new(HashMap::new())))
}
pub async fn request(
&self,
connection: &Connected,
request: Stanza,
id: String,
) -> Result<Stanza, RequestError> {
let (send, recv) = oneshot::channel();
{
self.0.lock().await.insert(id, send);
}
connection.write_handle().write(request).await?;
let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??;
Ok(stanza)
}
pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> {
let send;
{
send = self.0.lock().await.remove(&id);
}
match send {
Some(send) => {
let _ = send.send(Ok(response));
Ok(())
}
None => Err(ResponseError::NoMatchingId(id)),
}
}
pub async fn drain(&self) {
let mut pending = self.0.lock().await;
for (_id, sender) in pending.drain() {
let _ = sender.send(Err(ReadError::LostConnection));
}
}
}
impl ClientLogic {
pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self {
Self {
db,
pending: Pending::new(),
update_sender,
client,
}
}
pub fn client(&self) -> &Client {
&self.client
}
pub fn db(&self) -> &Db {
&self.db
}
pub fn pending(&self) -> &Pending {
&self.pending
}
pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
&self.update_sender
}
pub async fn handle_update(&self, update: UpdateMessage) {
// TODO: impl fmt
info!("{:?}", update);
self.update_sender().send(update).await;
}
pub async fn handle_error(&self, e: Error) {
error!("{}", e);
}
}
impl Logic for ClientLogic {
type Cmd = Command;
// pub async fn handle_stream_error(self, error) {}
// stanza errors (recoverable)
// pub async fn handle_error(self, error: Error) {}
// when it aborts, must clear iq map no matter what
async fn handle_connect(self, connection: lampada::Connected) {
connect::handle_connect(self, connection).await;
}
async fn handle_disconnect(self, connection: lampada::Connected) {
disconnect::handle_disconnect(self, connection).await;
}
async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) {
process_stanza::handle_stanza(self, stanza, connection).await;
}
async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) {
online::handle_online(self, command, connection).await;
}
async fn handle_offline(self, command: Self::Cmd) {
offline::handle_offline(self, command).await;
}
async fn on_abort(self) {
abort::on_abort(self).await;
}
async fn handle_connection_error(self, error: lampada::error::ConnectionError) {
connection_error::handle_connection_error(self, error).await;
}
async fn handle_stream_error(self, stream_error: stanza::stream::Error) {
self.handle_error(Error::Stream(stream_error)).await;
}
}