diff options
| author | 2025-02-10 17:48:39 +0000 | |
|---|---|---|
| committer | 2025-02-10 17:48:39 +0000 | |
| commit | 41c1ba15ef5865f4513db525ed595f3ce903dd26 (patch) | |
| tree | 385a6f758dbf0837e82dee9e8b011db57a93425b | |
| parent | a1d96233e816e2b8378a629c6cc9e34028f2435b (diff) | |
| download | luz-41c1ba15ef5865f4513db525ed595f3ce903dd26.tar.gz luz-41c1ba15ef5865f4513db525ed595f3ce903dd26.tar.bz2 luz-41c1ba15ef5865f4513db525ed595f3ce903dd26.zip | |
WIP: code cleanup
Diffstat (limited to '')
| -rw-r--r-- | luz/src/connection/mod.rs | 166 | ||||
| -rw-r--r-- | luz/src/connection/read.rs | 120 | ||||
| -rw-r--r-- | luz/src/connection/write.rs | 171 | ||||
| -rw-r--r-- | luz/src/error.rs | 32 | ||||
| -rw-r--r-- | luz/src/lib.rs | 577 | 
5 files changed, 584 insertions, 482 deletions
| diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs new file mode 100644 index 0000000..3ad2648 --- /dev/null +++ b/luz/src/connection/mod.rs @@ -0,0 +1,166 @@ +use std::ops::{Deref, DerefMut}; + +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use read::ReadControlHandle; +use sqlx::SqlitePool; +use tokio::{ +    sync::{mpsc, oneshot}, +    task::{JoinHandle, JoinSet}, +}; +use write::{WriteControlHandle, WriteHandle, WriteMessage}; + +use crate::UpdateMessage; + +mod read; +pub(crate) mod write; + +pub struct Supervisor { +    connection_commands: mpsc::Receiver<SupervisorCommand>, +    writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>, +    reader_crash: oneshot::Receiver<( +        SqlitePool, +        mpsc::Sender<UpdateMessage>, +        tokio::task::JoinSet<()>, +    )>, +    sender: mpsc::Sender<UpdateMessage>, +    writer_handle: WriteControlHandle, +    reader_handle: ReadControlHandle, +    on_shutdown: oneshot::Sender<()>, +} + +pub enum SupervisorCommand { +    Disconnect, +    // for if there was a stream error, require to reconnect +    Reconnect, +} + +impl Supervisor { +    fn new( +        connection_commands: mpsc::Receiver<SupervisorCommand>, +        writer_crash: oneshot::Receiver<(WriteMessage, mpsc::Receiver<WriteMessage>)>, +        reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, +        sender: mpsc::Sender<UpdateMessage>, +        writer_handle: WriteControlHandle, +        reader_handle: ReadControlHandle, +        on_shutdown: oneshot::Sender<()>, +    ) -> Self { +        Self { +            connection_commands, +            writer_crash, +            sender, +            writer_handle, +            reader_handle, +            reader_crash, +            on_shutdown, +        } +    } + +    async fn handle_command_message(&mut self, msg: SupervisorCommand) {} + +    async fn run(mut self) { +        loop { +            tokio::select! { +                Some(msg) = self.connection_commands.recv() => { +                    self.handle_command_message(msg).await; +                }, +                error = &mut self.writer_crash => { + +                }, +                error = &mut self.reader_crash => { + +                }, +                else => break, +            } +        } +        self.on_shutdown.send(()); +    } +} + +pub struct SupervisorHandle { +    sender: SupervisorSender, +    handle: JoinHandle<()>, +} + +impl Deref for SupervisorHandle { +    type Target = SupervisorSender; + +    fn deref(&self) -> &Self::Target { +        &self.sender +    } +} + +impl DerefMut for SupervisorHandle { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.sender +    } +} + +#[derive(Clone)] +pub struct SupervisorSender { +    sender: mpsc::Sender<SupervisorCommand>, +} + +impl Deref for SupervisorSender { +    type Target = mpsc::Sender<SupervisorCommand>; + +    fn deref(&self) -> &Self::Target { +        &self.sender +    } +} + +impl DerefMut for SupervisorSender { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.sender +    } +} + +impl SupervisorHandle { +    pub fn new( +        streams: BoundJabberStream<Tls>, +        update_sender: mpsc::Sender<UpdateMessage>, +        db: SqlitePool, +        on_shutdown: oneshot::Sender<()>, +    ) -> (WriteHandle, Self) { +        let (command_sender, command_receiver) = mpsc::channel(20); +        let (writer_error_sender, writer_error_receiver) = oneshot::channel(); +        let (reader_crash_sender, reader_crash_receiver) = oneshot::channel(); + +        let (reader, writer) = streams.split(); +        let (write_handle, write_control_handle) = +            WriteControlHandle::new(writer, writer_error_sender); +        let jabber_reader_control_handle = ReadControlHandle::new( +            reader, +            reader_crash_sender, +            db, +            update_sender.clone(), +            command_sender.clone(), +            write_handle.clone(), +        ); + +        let actor = Supervisor::new( +            command_receiver, +            writer_error_receiver, +            reader_crash_receiver, +            update_sender, +            write_control_handle, +            jabber_reader_control_handle, +            on_shutdown, +        ); + +        let handle = tokio::spawn(async move { actor.run().await }); + +        ( +            write_handle, +            Self { +                sender: SupervisorSender { +                    sender: command_sender, +                }, +                handle, +            }, +        ) +    } + +    pub fn sender(&self) -> SupervisorSender { +        self.sender.clone() +    } +} diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs new file mode 100644 index 0000000..7800d56 --- /dev/null +++ b/luz/src/connection/read.rs @@ -0,0 +1,120 @@ +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use sqlx::SqlitePool; +use stanza::client::Stanza; +use tokio::{ +    sync::{mpsc, oneshot}, +    task::{JoinHandle, JoinSet}, +}; + +use crate::UpdateMessage; + +use super::{ +    write::{WriteHandle, WriteMessage}, +    SupervisorCommand, +}; + +pub struct Read { +    // TODO: place iq hashmap here +    control_receiver: mpsc::Receiver<ReadControl>, +    stream: BoundJabberReader<Tls>, +    on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, +    db: SqlitePool, +    update_sender: mpsc::Sender<UpdateMessage>, +    supervisor_control: mpsc::Sender<SupervisorCommand>, +    write_handle: WriteHandle, +    tasks: JoinSet<()>, +} + +impl Read { +    fn new( +        control_receiver: mpsc::Receiver<ReadControl>, +        stream: BoundJabberReader<Tls>, +        on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, +        db: SqlitePool, +        update_sender: mpsc::Sender<UpdateMessage>, +        // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) +        supervisor_control: mpsc::Sender<SupervisorCommand>, +        write_sender: WriteHandle, +    ) -> Self { +        Self { +            control_receiver, +            stream, +            on_crash, +            db, +            update_sender, +            supervisor_control, +            write_handle: write_sender, +            tasks: JoinSet::new(), +        } +    } + +    async fn run(mut self) { +        loop { +            tokio::select! { +                Some(msg) = self.control_receiver.recv() => { +                    match msg { +                        ReadControl::Disconnect => todo!(), +                        ReadControl::Abort(sender) => todo!(), +                    }; +                }, +                stanza = self.stream.read::<Stanza>() => { +                    match stanza { +                        Ok(_) => todo!(), +                        Err(_) => todo!(), +                    } +                    self.tasks.spawn(); +                }, +                else => break +            } +        } +    } +} + +trait Task { +    async fn handle(); +} + +impl Task for Stanza { +    async fn handle() { +        todo!() +    } +} + +enum ReadControl { +    Disconnect, +    Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>), +} + +pub struct ReadControlHandle { +    sender: mpsc::Sender<ReadControl>, +    handle: JoinHandle<()>, +} + +impl ReadControlHandle { +    pub fn new( +        stream: BoundJabberReader<Tls>, +        on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, +        db: SqlitePool, +        sender: mpsc::Sender<UpdateMessage>, +        supervisor_control: mpsc::Sender<SupervisorCommand>, +        jabber_write: WriteHandle, +    ) -> Self { +        let (control_sender, control_receiver) = mpsc::channel(20); + +        let actor = Read::new( +            control_receiver, +            stream, +            on_crash, +            db, +            sender, +            supervisor_control, +            jabber_write, +        ); +        let handle = tokio::spawn(async move { actor.run().await }); + +        Self { +            sender: control_sender, +            handle, +        } +    } +} diff --git a/luz/src/connection/write.rs b/luz/src/connection/write.rs new file mode 100644 index 0000000..9c01519 --- /dev/null +++ b/luz/src/connection/write.rs @@ -0,0 +1,171 @@ +use std::ops::{Deref, DerefMut}; + +use jabber::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use stanza::client::Stanza; +use tokio::{ +    sync::{mpsc, oneshot}, +    task::JoinHandle, +}; + +use crate::error::Error; + +// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. +pub struct Write { +    stanza_receiver: mpsc::Receiver<WriteMessage>, +    control_receiver: mpsc::Receiver<WriteControl>, +    stream: BoundJabberWriter<Tls>, +    on_crash: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>, +} + +pub struct WriteMessage { +    stanza: Stanza, +    respond_to: oneshot::Sender<Result<(), Error>>, +} + +enum WriteControl { +    Disconnect, +    Abort(oneshot::Sender<mpsc::Receiver<WriteMessage>>), +} + +impl Write { +    fn new( +        stanza_receiver: mpsc::Receiver<WriteMessage>, +        control_receiver: mpsc::Receiver<WriteControl>, +        stream: BoundJabberWriter<Tls>, +        supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>, +    ) -> Self { +        Self { +            stanza_receiver, +            control_receiver, +            stream, +            on_crash: supervisor, +        } +    } + +    async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { +        Ok(self.stream.write(stanza).await?) +    } + +    async fn run(mut self) { +        loop { +            tokio::select! { +                Some(msg) = self.control_receiver.recv() => { +                    match msg { +                        WriteControl::Disconnect => { +                            // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send +                            self.stanza_receiver.close(); +                            // TODO: put this in some kind of function to avoid code duplication +                            for msg in self.stanza_receiver.recv().await { +                                let result = self.stream.write(&msg.stanza).await; +                                match result { +                                    Err(e) => match &e { +                                        peanuts::Error::ReadError(error) => { +                                            // make sure message is not lost from error, supervisor handles retry and reporting +                                            self.on_crash.send((msg, self.stanza_receiver)); +                                            break; +                                        } +                                        _ => { +                                            msg.respond_to.send(Err(e.into())); +                                        } +                                    }, +                                    _ => { +                                        msg.respond_to.send(Ok(())); +                                    } +                                } +                            } +                            self.stream.try_close().await; +                            break; +                        }, +                        WriteControl::Abort(sender) => { +                            sender.send(self.stanza_receiver); +                            break; +                        }, +                    } +                }, +                Some(msg) = self.stanza_receiver.recv() => { +                    let result = self.stream.write(&msg.stanza).await; +                    match result { +                        Err(e) => match &e { +                            peanuts::Error::ReadError(error) => { +                                // make sure message is not lost from error, supervisor handles retry and reporting +                                self.on_crash.send((msg, self.stanza_receiver)); +                                break; +                            } +                            _ => { +                                msg.respond_to.send(Err(e.into())); +                            } +                        }, +                        _ => { +                            msg.respond_to.send(Ok(())); +                        } +                    } +                }, +                // TODO: check if this is ok to do +                else => break, +            } +        } +    } +} + +#[derive(Clone)] +pub struct WriteHandle { +    sender: mpsc::Sender<WriteMessage>, +} + +impl Deref for WriteHandle { +    type Target = mpsc::Sender<WriteMessage>; + +    fn deref(&self) -> &Self::Target { +        &self.sender +    } +} + +impl DerefMut for WriteHandle { +    fn deref_mut(&mut self) -> &mut Self::Target { +        &mut self.sender +    } +} + +pub struct WriteControlHandle { +    sender: mpsc::Sender<WriteControl>, +    handle: JoinHandle<()>, +} + +impl WriteControlHandle { +    pub fn new( +        stream: BoundJabberWriter<Tls>, +        supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>, +    ) -> (WriteHandle, Self) { +        let (control_sender, control_receiver) = mpsc::channel(20); +        let (stanza_sender, stanza_receiver) = mpsc::channel(20); + +        let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); +        let handle = tokio::spawn(async move { actor.run().await }); + +        ( +            WriteHandle { +                sender: stanza_sender, +            }, +            Self { +                sender: control_sender, +                handle, +            }, +        ) +    } + +    pub fn reconnect( +        stream: BoundJabberWriter<Tls>, +        supervisor: oneshot::Sender<(WriteMessage, mpsc::Receiver<WriteMessage>)>, +        stanza_receiver: mpsc::Receiver<WriteMessage>, +    ) -> Self { +        let (control_sender, control_receiver) = mpsc::channel(20); + +        let actor = Write::new(stanza_receiver, control_receiver, stream, supervisor); +        let handle = tokio::spawn(async move { actor.run().await }); + +        Self { +            sender: control_sender, +            handle, +        } +    } +} diff --git a/luz/src/error.rs b/luz/src/error.rs new file mode 100644 index 0000000..d9dfaba --- /dev/null +++ b/luz/src/error.rs @@ -0,0 +1,32 @@ +pub enum Error { +    AlreadyConnected, +    Jabber(jabber::Error), +    XML(peanuts::Error), +    SQL(sqlx::Error), +    JID(jid::ParseError), +    AlreadyDisconnected, +} + +impl From<peanuts::Error> for Error { +    fn from(e: peanuts::Error) -> Self { +        Self::XML(e) +    } +} + +impl From<jid::ParseError> for Error { +    fn from(e: jid::ParseError) -> Self { +        Self::JID(e) +    } +} + +impl From<sqlx::Error> for Error { +    fn from(e: sqlx::Error) -> Self { +        Self::SQL(e) +    } +} + +impl From<jabber::Error> for Error { +    fn from(e: jabber::Error) -> Self { +        Self::Jabber(e) +    } +} diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 6372fe0..0dfc30c 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -1,301 +1,33 @@ -use std::{ -    collections::{HashMap, HashSet, VecDeque}, -    fmt::Pointer, -    pin::pin, -    sync::{atomic::AtomicBool, Arc}, -    task::{ready, Poll}, -}; +use std::sync::Arc; -use futures::{ -    stream::{SplitSink, SplitStream}, -    Sink, SinkExt, Stream, StreamExt, -}; -use jabber::{ -    connection::Tls, -    jabber_stream::bound_stream::{BoundJabberReader, BoundJabberStream, BoundJabberWriter}, -    JID, -}; -use sqlx::{query, Pool, Sqlite, SqlitePool}; -use stanza::{ -    client::{ -        iq::{Iq, IqType, Query}, -        Stanza, -    }, -    roster, -}; +use connection::SupervisorSender; +use jabber::JID; +use sqlx::SqlitePool; +use stanza::roster;  use tokio::{ -    io::AsyncRead, -    select, -    sync::{ -        mpsc::{self, Receiver, Sender}, -        oneshot, Mutex, -    }, -    task::{JoinHandle, JoinSet}, +    sync::{mpsc, oneshot, Mutex}, +    task::JoinSet,  }; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::sync::{PollSendError, PollSender}; - -// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream. -pub struct JabberWriter { -    stanza_receiver: mpsc::Receiver<JabberWrite>, -    control_receiver: mpsc::Receiver<JabberWriterControl>, -    stream: BoundJabberWriter<Tls>, -    on_crash: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>, -} - -struct JabberWrite { -    stanza: Stanza, -    respond_to: oneshot::Sender<Result<(), Error>>, -} - -enum JabberWriterControl { -    Disconnect, -    Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>), -} - -impl JabberWriter { -    fn new( -        stanza_receiver: mpsc::Receiver<JabberWrite>, -        control_receiver: mpsc::Receiver<JabberWriterControl>, -        stream: BoundJabberWriter<Tls>, -        supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>, -    ) -> Self { -        Self { -            stanza_receiver, -            control_receiver, -            stream, -            on_crash: supervisor, -        } -    } - -    async fn write(&mut self, stanza: &Stanza) -> Result<(), peanuts::Error> { -        Ok(self.stream.write(stanza).await?) -    } - -    async fn run(mut self) { -        loop { -            tokio::select! { -                Some(msg) = self.control_receiver.recv() => { -                    match msg { -                        JabberWriterControl::Disconnect => { -                            // TODO: close the stanza_receiver channel and drain out all of the remaining stanzas to send -                            self.stanza_receiver.close(); -                            // TODO: put this in some kind of function to avoid code duplication -                            for msg in self.stanza_receiver.recv().await { -                                let result = self.stream.write(&msg.stanza).await; -                                match result { -                                    Err(e) => match &e { -                                        peanuts::Error::ReadError(error) => { -                                            // make sure message is not lost from error, supervisor handles retry and reporting -                                            self.on_crash.send((msg, self.stanza_receiver)); -                                            break; -                                        } -                                        _ => { -                                            msg.respond_to.send(Err(e.into())); -                                        } -                                    }, -                                    _ => { -                                        msg.respond_to.send(Ok(())); -                                    } -                                } -                            } -                            self.stream.try_close().await; -                            break; -                        }, -                        JabberWriterControl::Abort(sender) => { -                            sender.send(self.stanza_receiver); -                            break; -                        }, -                    } -                }, -                Some(msg) = self.stanza_receiver.recv() => { -                    let result = self.stream.write(&msg.stanza).await; -                    match result { -                        Err(e) => match &e { -                            peanuts::Error::ReadError(error) => { -                                // make sure message is not lost from error, supervisor handles retry and reporting -                                self.on_crash.send((msg, self.stanza_receiver)); -                                break; -                            } -                            _ => { -                                msg.respond_to.send(Err(e.into())); -                            } -                        }, -                        _ => { -                            msg.respond_to.send(Ok(())); -                        } -                    } -                }, -                // TODO: check if this is ok to do -                else => break, -            } -        } -    } -} - -#[derive(Clone)] -pub struct JabberWriteHandle { -    sender: mpsc::Sender<JabberWrite>, -} - -pub struct JabberWriterControlHandle { -    sender: mpsc::Sender<JabberWriterControl>, -    handle: JoinHandle<()>, -} - -impl JabberWriterControlHandle { -    pub fn new( -        stream: BoundJabberWriter<Tls>, -        supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>, -    ) -> (JabberWriteHandle, JabberWriterControlHandle) { -        let (control_sender, control_receiver) = mpsc::channel(20); -        let (stanza_sender, stanza_receiver) = mpsc::channel(20); - -        let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor); -        let handle = tokio::spawn(async move { actor.run().await }); - -        ( -            JabberWriteHandle { -                sender: stanza_sender, -            }, -            Self { -                sender: control_sender, -                handle, -            }, -        ) -    } - -    pub fn reconnect( -        stream: BoundJabberWriter<Tls>, -        supervisor: oneshot::Sender<(JabberWrite, mpsc::Receiver<JabberWrite>)>, -        stanza_receiver: mpsc::Receiver<JabberWrite>, -    ) -> Self { -        let (control_sender, control_receiver) = mpsc::channel(20); - -        let actor = JabberWriter::new(stanza_receiver, control_receiver, stream, supervisor); -        let handle = tokio::spawn(async move { actor.run().await }); - -        Self { -            sender: control_sender, -            handle, -        } -    } -} - -pub struct JabberReader { -    // TODO: place iq hashmap here -    control_receiver: mpsc::Receiver<JabberReaderControl>, -    stream: BoundJabberReader<Tls>, -    on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, -    db: SqlitePool, -    sender: mpsc::Sender<UpdateMessage>, -    supervisor_control: mpsc::Sender<JabberSupervisorCommand>, -    write_sender: mpsc::Sender<JabberWrite>, -    tasks: JoinSet<()>, -} - -impl JabberReader { -    fn new( -        control_receiver: mpsc::Receiver<JabberReaderControl>, -        stream: BoundJabberReader<Tls>, -        on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, -        db: SqlitePool, -        sender: mpsc::Sender<UpdateMessage>, -        // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs) -        supervisor_control: mpsc::Sender<JabberSupervisorCommand>, -        write_sender: mpsc::Sender<JabberWrite>, -    ) -> Self { -        Self { -            control_receiver, -            stream, -            on_crash, -            db, -            sender, -            supervisor_control, -            write_sender, -            tasks: JoinSet::new(), -        } -    } - -    async fn run(mut self) { -        loop { -            tokio::select! { -                Some(msg) = self.control_receiver.recv() => { -                    match msg { -                        JabberReaderControl::Disconnect => todo!(), -                        JabberReaderControl::Abort(sender) => todo!(), -                    }; -                }, -                stanza = self.stream.read::<Stanza>() => { -                    match stanza { -                        Ok(_) => todo!(), -                        Err(_) => todo!(), -                    } -                    self.tasks.spawn(); -                }, -                else => break -            } -        } -    } -} - -trait Task { -    async fn handle(); -} - -impl Task for Stanza { -    async fn handle() { -        todo!() -    } -} - -enum JabberReaderControl { -    Disconnect, -    Abort(oneshot::Sender<mpsc::Receiver<JabberWrite>>), -} - -struct JabberReaderControlHandle { -    sender: mpsc::Sender<JabberReaderControl>, -    handle: JoinHandle<()>, -} - -impl JabberReaderControlHandle { -    pub fn new( -        stream: BoundJabberReader<Tls>, -        on_crash: oneshot::Sender<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, -        db: SqlitePool, -        sender: mpsc::Sender<UpdateMessage>, -        supervisor_control: mpsc::Sender<JabberSupervisorCommand>, -        jabber_write: mpsc::Sender<JabberWrite>, -    ) -> Self { -        let (control_sender, control_receiver) = mpsc::channel(20); -        let actor = JabberReader::new( -            control_receiver, -            stream, -            on_crash, -            db, -            sender, -            supervisor_control, -            jabber_write, -        ); -        let handle = tokio::spawn(async move { actor.run().await }); +use crate::connection::write::WriteHandle; +use crate::connection::{SupervisorCommand, SupervisorHandle}; +use crate::error::Error; -        Self { -            sender: control_sender, -            handle, -        } -    } -} +mod connection; +mod error;  pub struct Luz {      receiver: mpsc::Receiver<CommandMessage>,      jid: Arc<Mutex<JID>>,      // TODO: use a dyn passwordprovider trait to avoid storing password in memory      password: String, -    connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>, +    connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>,      db: SqlitePool,      sender: mpsc::Sender<UpdateMessage>, +    /// if connection was shut down due to e.g. server shutdown, supervisor must be able to mark client as disconnected +    connection_supervisor_shutdown: oneshot::Receiver<()>, +    // TODO: will need to have an auto reconnect state as well (e.g. in case server shut down, to try and reconnect later) +    // TODO: will grow forever at this point, maybe not required as tasks will naturally shut down anyway?      tasks: JoinSet<()>,  } @@ -304,7 +36,8 @@ impl Luz {          receiver: mpsc::Receiver<CommandMessage>,          jid: Arc<Mutex<JID>>,          password: String, -        connected: Arc<Mutex<Option<(JabberWriteHandle, JabberSupervisorHandle)>>>, +        connected: Arc<Mutex<Option<(WriteHandle, SupervisorHandle)>>>, +        connection_supervisor_shutdown: oneshot::Receiver<()>,          db: SqlitePool,          sender: mpsc::Sender<UpdateMessage>,      ) -> Self { @@ -316,71 +49,87 @@ impl Luz {              receiver,              sender,              tasks: JoinSet::new(), +            connection_supervisor_shutdown,          }      }      async fn run(mut self) { -        while let Some(msg) = self.receiver.recv().await { -            // TODO: consider separating disconnect/connect and commands apart from commandmessage -            match msg { -                CommandMessage::Connect => { -                    match self.connected.lock().await.as_ref() { -                        Some(_) => { -                            self.sender -                                .send(UpdateMessage::Error(Error::AlreadyConnected)) -                                .await; -                        } -                        None => { -                            let mut jid = self.jid.lock().await; -                            let mut domain = jid.domainpart.clone(); -                            let streams_result = -                                jabber::connect_and_login(&mut jid, &self.password, &mut domain) -                                    .await; -                            match streams_result { -                                Ok(s) => { -                                    let (writer, supervisor) = -                                        JabberSupervisorHandle::new(s, self.sender.clone()); -                                    *self.connected.lock().await = Some((writer, supervisor)); +        loop { +            tokio::select! { +                _ = &mut self.connection_supervisor_shutdown => { +                    *self.connected.lock().await = None +                } +                Some(msg) = self.receiver.recv() => { +                    // TODO: consider separating disconnect/connect and commands apart from commandmessage +                    // TODO: dispatch commands separate tasks +                    match msg { +                        CommandMessage::Connect => { +                            match self.connected.lock().await.as_ref() { +                                Some(_) => { +                                    self.sender +                                        .send(UpdateMessage::Error(Error::AlreadyConnected)) +                                        .await;                                  } -                                Err(e) => { -                                    self.sender.send(UpdateMessage::Error(e.into())); +                                None => { +                                    let mut jid = self.jid.lock().await; +                                    let mut domain = jid.domainpart.clone(); +                                    // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource) +                                    let streams_result = +                                        jabber::connect_and_login(&mut jid, &self.password, &mut domain) +                                            .await; +                                    match streams_result { +                                        Ok(s) => { +                                            let (shutdown_send, shutdown_recv) = oneshot::channel::<()>(); +                                            let (writer, supervisor) = SupervisorHandle::new( +                                                s, +                                                self.sender.clone(), +                                                self.db.clone(), +                                                shutdown_send, +                                            ); +                                            self.connection_supervisor_shutdown = shutdown_recv; +                                            *self.connected.lock().await = Some((writer, supervisor)); +                                        } +                                        Err(e) => { +                                            self.sender.send(UpdateMessage::Error(e.into())); +                                        } +                                    }                                  } +                            }; +                        } +                        CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { +                            None => { +                                self.sender +                                    .send(UpdateMessage::Error(Error::AlreadyDisconnected)) +                                    .await; +                            } +                            mut c => { +                                if let Some((_write_handle, supervisor_handle)) = c.take() { +                                    let _ = supervisor_handle.send(SupervisorCommand::Disconnect).await; +                                } else { +                                    unreachable!() +                                };                              } +                        }, +                        _ => { +                            match self.connected.lock().await.as_ref() { +                                Some((w, s)) => self.tasks.spawn(msg.handle_online( +                                    w.clone(), +                                    s.sender(), +                                    self.jid.clone(), +                                    self.db.clone(), +                                    self.sender.clone(), +                                    // TODO: iq hashmap +                                )), +                                None => self.tasks.spawn(msg.handle_offline( +                                    self.jid.clone(), +                                    self.db.clone(), +                                    self.sender.clone(), +                                )), +                            };                          } -                    }; -                } -                CommandMessage::Disconnect => match self.connected.lock().await.as_mut() { -                    None => { -                        self.sender -                            .send(UpdateMessage::Error(Error::AlreadyDisonnected)) -                            .await; -                    } -                    mut c => { -                        if let Some((_write_handle, supervisor_handle)) = c.take() { -                            let _ = supervisor_handle -                                .sender -                                .send(JabberSupervisorCommand::Disconnect) -                                .await; -                        } else { -                            unreachable!() -                        };                      }                  }, -                _ => { -                    match self.connected.lock().await.as_ref() { -                        Some((w, _)) => self.tasks.spawn(msg.handle_online( -                            w.clone(), -                            self.jid.clone(), -                            self.db.clone(), -                            self.sender.clone(), -                        )), -                        None => self.tasks.spawn(msg.handle_offline( -                            self.jid.clone(), -                            self.db.clone(), -                            self.sender.clone(), -                        )), -                    }; -                } +                else => break,              }          }      } @@ -398,7 +147,8 @@ impl CommandMessage {      pub async fn handle_online(          mut self, -        jabber_write_handle: JabberWriteHandle, +        write_handle: WriteHandle, +        supervisor_control: SupervisorSender,          // TODO: jid could lose resource by the end          jid: Arc<Mutex<JID>>,          db: SqlitePool, @@ -418,12 +168,15 @@ impl LuzHandle {      pub fn new(jid: JID, password: String, db: SqlitePool) -> Self {          let (command_sender, command_receiver) = mpsc::channel(20);          let (update_sender, update_receiver) = mpsc::channel(20); +        // might be bad, first supervisor shutdown notification oneshot is never used (disgusting) +        let (sup_send, sup_recv) = oneshot::channel();          let actor = Luz::new(              command_receiver,              Arc::new(Mutex::new(jid)),              password,              Arc::new(Mutex::new(None)), +            sup_recv,              db,              update_sender,          ); @@ -436,146 +189,6 @@ impl LuzHandle {      }  } -pub struct JabberSupervisor { -    connection_commands: mpsc::Receiver<JabberSupervisorCommand>, -    writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>, -    reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, -    sender: mpsc::Sender<UpdateMessage>, -    writer_handle: JabberWriterControlHandle, -    reader_handle: JabberReaderControlHandle, -} - -pub enum JabberSupervisorCommand { -    Disconnect, -} - -impl JabberSupervisor { -    fn new( -        connection_commands: mpsc::Receiver<JabberSupervisorCommand>, -        writer_crash: oneshot::Receiver<(JabberWrite, mpsc::Receiver<JabberWrite>)>, -        reader_crash: oneshot::Receiver<(SqlitePool, mpsc::Sender<UpdateMessage>, JoinSet<()>)>, -        sender: mpsc::Sender<UpdateMessage>, -        writer_handle: JabberWriterControlHandle, -        reader_handle: JabberReaderControlHandle, -    ) -> Self { -        Self { -            connection_commands, -            writer_crash, -            sender, -            writer_handle, -            reader_handle, -            reader_crash, -        } -    } - -    async fn handle_command_message(&mut self, msg: JabberSupervisorCommand) {} - -    async fn run(mut self) { -        loop { -            tokio::select! { -                Some(msg) = self.connection_commands.recv() => { -                    self.handle_command_message(msg).await; -                }, -                error = self.writer_crash => { - -                }, -                error = self.reader_crash => { - -                }, -            } -        } -    } -} - -pub struct JabberSupervisorHandle { -    sender: mpsc::Sender<JabberSupervisorCommand>, -    handle: JoinHandle<()>, -} - -impl JabberSupervisorHandle { -    pub fn new( -        streams: BoundJabberStream<Tls>, -        sender: mpsc::Sender<UpdateMessage>, -        db: SqlitePool, -        update_sender: mpsc::Sender<UpdateMessage>, -    ) -> (JabberWriteHandle, Self) { -        let (command_sender, command_receiver) = mpsc::channel(20); -        let (writer_error_sender, writer_error_receiver) = oneshot::channel(); -        let (reader_crash_sender, reader_crash_receiver) = oneshot::channel(); - -        let (reader, writer) = streams.split(); -        let (jabber_write_handle, jabber_writer_control_handle) = -            JabberWriterControlHandle::new(writer, writer_error_sender); -        let jabber_reader_control_handle = JabberReaderControlHandle::new( -            reader, -            reader_crash_sender, -            db, -            update_sender, -            command_sender.clone(), -            jabber_write_handle.sender.clone(), -        ); - -        let actor = JabberSupervisor::new( -            command_receiver, -            writer_error_receiver, -            reader_crash_receiver, -            sender, -            jabber_writer_control_handle, -            jabber_reader_control_handle, -        ); - -        let handle = tokio::spawn(async move { actor.run().await }); - -        ( -            jabber_write_handle, -            Self { -                sender: command_sender, -                handle, -            }, -        ) -    } -} - -pub enum Error { -    AlreadyConnected, -    PollSend(PollSendError<CommandMessage>), -    Jabber(jabber::Error), -    XML(peanuts::Error), -    SQL(sqlx::Error), -    JID(jid::ParseError), -    AlreadyDisonnected, -} - -impl From<peanuts::Error> for Error { -    fn from(e: peanuts::Error) -> Self { -        Self::XML(e) -    } -} - -impl From<jid::ParseError> for Error { -    fn from(e: jid::ParseError) -> Self { -        Self::JID(e) -    } -} - -impl From<sqlx::Error> for Error { -    fn from(e: sqlx::Error) -> Self { -        Self::SQL(e) -    } -} - -impl From<jabber::Error> for Error { -    fn from(e: jabber::Error) -> Self { -        Self::Jabber(e) -    } -} - -impl From<PollSendError<CommandMessage>> for Error { -    fn from(e: PollSendError<CommandMessage>) -> Self { -        Self::PollSend(e) -    } -} -  pub enum CommandMessage {      Connect,      Disconnect, | 
