aboutsummaryrefslogtreecommitdiffstats
path: root/filamento/src/logic/mod.rs
blob: 15c2d12dcb484ba536677dff453f6c6db25ba780 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::{collections::HashMap, sync::Arc};

use lampada::{Connected, Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
use tracing::{error, info, warn};

use crate::{
    Client, Command, UpdateMessage,
    db::Db,
    error::{Error, RequestError, ResponseError},
};

mod abort;
mod connect;
mod connection_error;
mod disconnect;
mod local_only;
mod offline;
mod online;
mod process_stanza;

#[derive(Clone)]
pub struct ClientLogic {
    client: Client,
    db: Db,
    pending: Pending,
    update_sender: mpsc::Sender<UpdateMessage>,
}

#[derive(Clone)]
pub struct Pending(Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>);

impl Pending {
    pub fn new() -> Self {
        Self(Arc::new(Mutex::new(HashMap::new())))
    }

    pub async fn request(
        &self,
        connection: &Connected,
        request: Stanza,
        id: String,
    ) -> Result<Stanza, RequestError> {
        let (send, recv) = oneshot::channel();
        {
            self.0.lock().await.insert(id, send);
        }
        connection.write_handle().write(request).await?;
        let stanza = recv.await.map_err(|e| ReadError::Actor(e.into()))??;
        Ok(stanza)
    }

    pub async fn respond(&self, response: Stanza, id: String) -> Result<(), ResponseError> {
        let send;
        {
            send = self.0.lock().await.remove(&id);
        }
        match send {
            Some(send) => {
                let _ = send.send(Ok(response));
                Ok(())
            }
            None => Err(ResponseError::NoMatchingId(id)),
        }
    }

    pub async fn drain(&self) {
        let mut pending = self.0.lock().await;
        for (_id, sender) in pending.drain() {
            let _ = sender.send(Err(ReadError::LostConnection));
        }
    }
}

impl ClientLogic {
    pub fn new(client: Client, db: Db, update_sender: mpsc::Sender<UpdateMessage>) -> Self {
        Self {
            db,
            pending: Pending::new(),
            update_sender,
            client,
        }
    }

    pub fn client(&self) -> &Client {
        &self.client
    }

    pub fn db(&self) -> &Db {
        &self.db
    }

    pub fn pending(&self) -> &Pending {
        &self.pending
    }

    pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
        &self.update_sender
    }

    pub async fn handle_update(&self, update: UpdateMessage) {
        // TODO: impl fmt
        info!("{:?}", update);
        self.update_sender().send(update).await;
    }

    pub async fn handle_error(&self, e: Error) {
        error!("{}", e);
    }
}

impl Logic for ClientLogic {
    type Cmd = Command;

    // pub async fn handle_stream_error(self, error) {}
    // stanza errors (recoverable)
    // pub async fn handle_error(self, error: Error) {}
    // when it aborts, must clear iq map no matter what

    async fn handle_connect(self, connection: lampada::Connected) {
        connect::handle_connect(self, connection).await;
    }

    async fn handle_disconnect(self, connection: lampada::Connected) {
        disconnect::handle_disconnect(self, connection).await;
    }

    async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) {
        process_stanza::handle_stanza(self, stanza, connection).await;
    }

    async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) {
        online::handle_online(self, command, connection).await;
    }

    async fn handle_offline(self, command: Self::Cmd) {
        offline::handle_offline(self, command).await;
    }

    async fn on_abort(self) {
        abort::on_abort(self).await;
    }

    async fn handle_connection_error(self, error: lampada::error::ConnectionError) {
        connection_error::handle_connection_error(self, error).await;
    }

    async fn handle_stream_error(self, stream_error: stanza::stream::Error) {
        self.handle_error(Error::Stream(stream_error)).await;
    }
}