aboutsummaryrefslogtreecommitdiffstats
path: root/lampada/src/lib.rs
blob: a01ba063d8153780ced40de5c1706084b28e4dd1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
use std::{
    collections::HashMap,
    ops::{Deref, DerefMut},
    str::FromStr,
    sync::Arc,
    time::Duration,
};

pub use connection::write::WriteMessage;
pub use connection::SupervisorSender;
use error::ConnectionError;
use futures::{future::Fuse, FutureExt};
use luz::JID;
use stanza::client::{
    iq::{self, Iq, IqType},
    Stanza,
};
use stanza::stream::Error as StreamError;
use tokio::{
    sync::{mpsc, oneshot, Mutex},
    task::JoinSet,
    time::timeout,
};
use tracing::{debug, info};

use crate::connection::write::WriteHandle;
use crate::connection::{SupervisorCommand, SupervisorHandle};

mod connection;
pub mod error;

#[derive(Clone)]
pub struct Connected {
    // full jid will stay stable across reconnections
    jid: JID,
    write_handle: WriteHandle,
}

impl Connected {
    pub fn jid(&self) -> &JID {
        &self.jid
    }

    pub fn write_handle(&self) -> &WriteHandle {
        &self.write_handle
    }
}

/// everything that a particular xmpp client must implement
pub trait Logic {
    /// the command message type
    type Cmd;

    /// run after binding to the stream (e.g. for a chat client, )
    fn handle_connect(self, connection: Connected) -> impl std::future::Future<Output = ()> + Send;

    /// run before closing the stream (e.g. send unavailable presence in a chat client)
    fn handle_disconnect(
        self,
        connection: Connected,
    ) -> impl std::future::Future<Output = ()> + Send;

    fn handle_stream_error(
        self,
        stream_error: StreamError,
    ) -> impl std::future::Future<Output = ()> + Send;

    /// run to handle an incoming xmpp stanza
    fn handle_stanza(
        self,
        stanza: Stanza,
        connection: Connected,
    ) -> impl std::future::Future<Output = ()> + std::marker::Send;

    /// run to handle a command message when a connection is currently established
    fn handle_online(
        self,
        command: Self::Cmd,
        connection: Connected,
    ) -> impl std::future::Future<Output = ()> + std::marker::Send;

    /// run to handle a command message when disconnected
    fn handle_offline(
        self,
        command: Self::Cmd,
    ) -> impl std::future::Future<Output = ()> + std::marker::Send;

    /// run as cleanup after either an abort or a disconnect (e.g. reply to all pending requests with a disconnected error)
    fn on_abort(self) -> impl std::future::Future<Output = ()> + std::marker::Send;

    /// handle connection errors from the core client logic
    fn handle_connection_error(
        self,
        error: ConnectionError,
    ) -> impl std::future::Future<Output = ()> + std::marker::Send;

    // async fn handle_stream_error(self, error) {}
}

/// an actor that implements xmpp core (rfc6120), manages connection/stream status, and delegates any other logic to the generic which implements Logic, allowing different kinds of clients (e.g. chat, social, pubsub) to be built upon the same core
pub struct CoreClient<Lgc: Logic> {
    jid: JID,
    // TODO: use a dyn passwordprovider trait to avoid storing password in memory
    password: Arc<String>,
    receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
    connected: Option<(Connected, SupervisorHandle)>,
    // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later)
    // connected_intention: bool,
    /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected
    connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
    logic: Lgc,
    // config: LampConfig,
    // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?
    tasks: JoinSet<()>,
}

impl<Lgc: Logic + Clone + Send + 'static> CoreClient<Lgc> {
    /// create a new actor
    pub fn new(
        jid: JID,
        password: String,
        receiver: mpsc::Receiver<CoreClientCommand<Lgc::Cmd>>,
        connected: Option<(Connected, SupervisorHandle)>,
        connection_supervisor_shutdown: Fuse<oneshot::Receiver<()>>,
        logic: Lgc,
    ) -> Self {
        Self {
            jid,
            password: Arc::new(password),
            connected,
            receiver,
            connection_supervisor_shutdown,
            logic,
            tasks: JoinSet::new(),
        }
    }

    /// run the actor
    pub async fn run(mut self) {
        loop {
            let msg = tokio::select! {
                // this is okay, as when created the supervisor (and connection) doesn't exist, but a bit messy
                // THIS IS NOT OKAY LOLLLL - apparently fusing is the best option???
                _ = &mut self.connection_supervisor_shutdown => {
                    self.connected = None;
                    continue;
                }
                Some(msg) = self.receiver.recv() => {
                    msg
                },
                else => break,
            };
            match msg {
                CoreClientCommand::Connect => {
                    match self.connected {
                        Some(_) => {
                            self.logic
                                .clone()
                                .handle_connection_error(ConnectionError::AlreadyConnected)
                                .await;
                        }
                        None => {
                            let mut jid = self.jid.clone();
                            let mut domain = jid.domainpart.clone();
                            // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
                            let streams_result =
                                luz::connect_and_login(&mut jid, &*self.password, &mut domain)
                                    .await;
                            match streams_result {
                                Ok(s) => {
                                    debug!("ok stream result");
                                    let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
                                    let (writer, supervisor) = SupervisorHandle::new(
                                        s,
                                        shutdown_send,
                                        jid.clone(),
                                        self.password.clone(),
                                        self.logic.clone(),
                                    );

                                    let shutdown_recv = shutdown_recv.fuse();
                                    self.connection_supervisor_shutdown = shutdown_recv;

                                    let connected = Connected {
                                        jid,
                                        write_handle: writer,
                                    };

                                    self.logic.clone().handle_connect(connected.clone()).await;

                                    self.connected = Some((connected, supervisor));
                                }
                                Err(e) => {
                                    tracing::error!("error: {}", e);
                                    self.logic
                                        .clone()
                                        .handle_connection_error(ConnectionError::ConnectionFailed(
                                            e.into(),
                                        ))
                                        .await;
                                }
                            }
                        }
                    };
                }
                CoreClientCommand::Disconnect => match self.connected {
                    None => {
                        self.logic
                            .clone()
                            .handle_connection_error(ConnectionError::AlreadyDisconnected)
                            .await;
                    }
                    ref mut c => {
                        if let Some((connected, supervisor_handle)) = c.take() {
                            let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await;
                        } else {
                            unreachable!()
                        };
                    }
                },
                CoreClientCommand::Command(command) => {
                    match self.connected.as_ref() {
                        Some((w, s)) => self
                            .tasks
                            .spawn(self.logic.clone().handle_online(command, w.clone())),
                        None => self.tasks.spawn(self.logic.clone().handle_offline(command)),
                    };
                }
            }
        }
    }
}

// TODO: generate methods for each with a macro
pub enum CoreClientCommand<C> {
    // TODO: login invisible xep-0186
    /// connect to XMPP chat server. gets roster and publishes initial presence.
    Connect,
    /// disconnect from XMPP chat server, sending unavailable presence then closing stream.
    Disconnect,
    /// TODO: generics
    Command(C),
}