diff options
Diffstat (limited to 'filamento/src/logic/mod.rs')
-rw-r--r-- | filamento/src/logic/mod.rs | 69 |
1 files changed, 57 insertions, 12 deletions
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs index 61d78bf..15c2d12 100644 --- a/filamento/src/logic/mod.rs +++ b/filamento/src/logic/mod.rs @@ -1,16 +1,21 @@ use std::{collections::HashMap, sync::Arc}; -use lampada::{Logic, error::ReadError}; +use lampada::{Connected, Logic, error::ReadError}; use stanza::client::Stanza; use tokio::sync::{Mutex, mpsc, oneshot}; use tracing::{error, info, warn}; -use crate::{Client, Command, UpdateMessage, db::Db, error::Error}; +use crate::{ + Client, Command, UpdateMessage, + db::Db, + error::{Error, RequestError, ResponseError}, +}; mod abort; mod connect; mod connection_error; mod disconnect; +mod local_only; mod offline; mod online; mod process_stanza; @@ -19,20 +24,60 @@ mod process_stanza; pub struct ClientLogic { client: Client, db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, + pending: Pending, update_sender: mpsc::Sender<UpdateMessage>, } +#[derive(Clone)] +pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>); + +impl Pending { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(HashMap::new()))) + } + + pub async fn request( + &self, + connection: &Connected, + request: Stanza, + id: String, + ) -> Result<Stanza, RequestError> { + let (send, recv) = oneshot::channel(); + { + self.0.lock().await.insert(id, send); + } + connection.write_handle().write(request).await?; + let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??; + Ok(stanza) + } + + pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> { + let send; + { + send = self.0.lock().await.remove(&id); + } + match send { + Some(send) => { + let _ = send.send(Ok(response)); + Ok(()) + } + None => Err(ResponseError::NoMatchingId(id)), + } + } + + pub async fn drain(&self) { + let mut pending = self.0.lock().await; + for (_id, sender) in pending.drain() { + let _ = sender.send(Err(ReadError::LostConnection)); + } + } +} + impl ClientLogic { - pub fn new( - client: Client, - db: Db, - pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, - update_sender: mpsc::Sender<UpdateMessage>, - ) -> Self { + pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self { Self { db, - pending, + pending: Pending::new(), update_sender, client, } @@ -46,8 +91,8 @@ impl ClientLogic { &self.db } - pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> { - &self.pending.as_ref() + pub fn pending(&self) -> &Pending { + &self.pending } pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { |