use std::{collections::HashMap, sync::Arc}; use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; use tracing::{error, info, warn}; use crate::{ Client, Command, UpdateMessage, db::Db, error::{Error, RequestError, ResponseError}, }; mod abort; mod connect; mod connection_error; mod disconnect; mod local_only; mod offline; mod online; mod process_stanza; #[derive(Clone)] pub struct ClientLogic { client: Client, db: Db, pending: Pending, update_sender: mpsc::Sender, } #[derive(Clone)] pub struct Pending(Arc>>>>); impl Pending { pub fn new() -> Self { Self(Arc::new(Mutex::new(HashMap::new()))) } pub async fn request( &self, connection: &Connected, request: Stanza, id: String, ) -> Result { let (send, recv) = oneshot::channel(); { self.0.lock().await.insert(id, send); } connection.write_handle().write(request).await?; let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??; Ok(stanza) } pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> { let send; { send = self.0.lock().await.remove(&id); } match send { Some(send) => { let _ = send.send(Ok(response)); Ok(()) } None => Err(ResponseError::NoMatchingId(id)), } } pub async fn drain(&self) { let mut pending = self.0.lock().await; for (_id, sender) in pending.drain() { let _ = sender.send(Err(ReadError::LostConnection)); } } } impl ClientLogic { pub fn new(client: Client, db: Db, update_sender: mpsc::Sender) -> Self { Self { db, pending: Pending::new(), update_sender, client, } } pub fn client(&self) -> &Client { &self.client } pub fn db(&self) -> &Db { &self.db } pub fn pending(&self) -> &Pending { &self.pending } pub fn update_sender(&self) -> &mpsc::Sender { &self.update_sender } pub async fn handle_update(&self, update: UpdateMessage) { // TODO: impl fmt info!("{:?}", update); self.update_sender().send(update).await; } pub async fn handle_error(&self, e: Error) { error!("{}", e); } } impl Logic for ClientLogic { type Cmd = Command; // pub async fn handle_stream_error(self, error) {} // stanza errors (recoverable) // pub async fn handle_error(self, error: Error) {} // when it aborts, must clear iq map no matter what async fn handle_connect(self, connection: lampada::Connected) { connect::handle_connect(self, connection).await; } async fn handle_disconnect(self, connection: lampada::Connected) { disconnect::handle_disconnect(self, connection).await; } async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) { process_stanza::handle_stanza(self, stanza, connection).await; } async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) { online::handle_online(self, command, connection).await; } async fn handle_offline(self, command: Self::Cmd) { offline::handle_offline(self, command).await; } async fn on_abort(self) { abort::on_abort(self).await; } async fn handle_connection_error(self, error: lampada::error::ConnectionError) { connection_error::handle_connection_error(self, error).await; } async fn handle_stream_error(self, stream_error: stanza::stream::Error) { self.handle_error(Error::Stream(stream_error)).await; } }