aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'filamento/src/logic/mod.rs')
-rw-r--r--filamento/src/logic/mod.rs69
1 files changed, 57 insertions, 12 deletions
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 61d78bf..15c2d12 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -1,16 +1,21 @@
use std::{collections::HashMap, sync::Arc};
-use lampada::{Logic, error::ReadError};
+use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
use tracing::{error, info, warn};
-use crate::{Client, Command, UpdateMessage, db::Db, error::Error};
+use crate::{
+ Client, Command, UpdateMessage,
+ db::Db,
+ error::{Error, RequestError, ResponseError},
+};
mod abort;
mod connect;
mod connection_error;
mod disconnect;
+mod local_only;
mod offline;
mod online;
mod process_stanza;
@@ -19,20 +24,60 @@ mod process_stanza;
pub struct ClientLogic {
client: Client,
db: Db,
- pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
+ pending: Pending,
update_sender: mpsc::Sender<UpdateMessage>,
}
+#[derive(Clone)]
+pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>);
+
+impl Pending {
+ pub fn new() -> Self {
+ Self(Arc::new(Mutex::new(HashMap::new())))
+ }
+
+ pub async fn request(
+ &self,
+ connection: &Connected,
+ request: Stanza,
+ id: String,
+ ) -> Result<Stanza, RequestError> {
+ let (send, recv) = oneshot::channel();
+ {
+ self.0.lock().await.insert(id, send);
+ }
+ connection.write_handle().write(request).await?;
+ let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??;
+ Ok(stanza)
+ }
+
+ pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> {
+ let send;
+ {
+ send = self.0.lock().await.remove(&id);
+ }
+ match send {
+ Some(send) => {
+ let _ = send.send(Ok(response));
+ Ok(())
+ }
+ None => Err(ResponseError::NoMatchingId(id)),
+ }
+ }
+
+ pub async fn drain(&self) {
+ let mut pending = self.0.lock().await;
+ for (_id, sender) in pending.drain() {
+ let _ = sender.send(Err(ReadError::LostConnection));
+ }
+ }
+}
+
impl ClientLogic {
- pub fn new(
- client: Client,
- db: Db,
- pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
- update_sender: mpsc::Sender<UpdateMessage>,
- ) -> Self {
+ pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self {
Self {
db,
- pending,
+ pending: Pending::new(),
update_sender,
client,
}
@@ -46,8 +91,8 @@ impl ClientLogic {
&self.db
}
- pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> {
- &self.pending.as_ref()
+ pub fn pending(&self) -> &Pending {
+ &self.pending
}
pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {