diff options
Diffstat (limited to '')
| -rw-r--r-- | TODO.md | 42 | ||||
| -rw-r--r-- | luz/src/connection/mod.rs | 80 | ||||
| -rw-r--r-- | luz/src/connection/read.rs | 1 | ||||
| -rw-r--r-- | luz/src/lib.rs | 1 | 
4 files changed, 105 insertions, 19 deletions
| @@ -2,22 +2,36 @@  ## next -ci/cd: doc generation -feature: error handling on stream according to rfc6120 -docs: jid -docs: jabber -docs: starttls -docs: sasl -docs: resource binding -feature: starttls -feature: sasl -feature: resource binding - -## in progress - -feature: logging +feat(luz): everything in rfc6120 and rfc6121 +  feat(luz): handle_online +  feat(luz): handle_offline +  feat(luz): handle_stanza +  feat(luz): database +  feat(luz): error handling on stream according to rfc6120 +  feat(luz): send message +  feat(luz): receive message +  feat(luz): retreive messages stored in database +  feat(luz): get roster (online and offline) +  feat(luz): set roster +  feat(luz): reconnect supervisorcommand +feat: thiserror everywhere +feat(luz): proper stanza ids +test: proper tests +ci: doc generation +docs(jid): jid +feat(peanuts): derive macros for IntoElement and FromElement +docs(jabber): connection, starttls, sasl, binding, bound_stream, etc. +feat: proper logging for everything basically +feat(luz): passwordprovider trait, to avoid storing password in struct +feat(luz): auto-reconnect state stored in luz actor, for if e.g. server shut down +refactor(luz): dealing properly with all the joinsets and joinhandles +feat(peanuts): some kind of way to configure the reader and writer to log the raw xml written to the stream, probably by having a method that allows you to add a log writer to them. will need to investigate some kind of log namespacing. +feat(jabber): storing resource within the bound_stream connection  ## done +feature: starttls +feature: sasl +feature: resource binding  feature: jabber client connection  feature: jid diff --git a/luz/src/connection/mod.rs b/luz/src/connection/mod.rs index f8cf18b..d05eb62 100644 --- a/luz/src/connection/mod.rs +++ b/luz/src/connection/mod.rs @@ -46,7 +46,21 @@ pub enum SupervisorCommand {      Disconnect,      // for if there was a stream error, require to reconnect      // couldn't stream errors just cause a crash? lol -    Reconnect, +    Reconnect(State), +} + +pub enum State { +    Write(mpsc::Receiver<WriteMessage>), +    Read( +        ( +            SqlitePool, +            mpsc::Sender<UpdateMessage>, +            tokio::task::JoinSet<()>, +            mpsc::Sender<SupervisorCommand>, +            WriteHandle, +            Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Error>>>>>, +        ), +    ),  }  impl Supervisor { @@ -101,10 +115,70 @@ impl Supervisor {                              }                              break;                          }, -                        SupervisorCommand::Reconnect => { +                        SupervisorCommand::Reconnect(state) => {                              // TODO: please omfg                              // send abort to read stream, as already done, consider -                            todo!() +                            let (read_state, mut write_state); +                            match state { +                                // TODO: proper state things for read and write thread +                                State::Write(receiver) => { +                                    write_state = receiver; +                                    let (send, recv) = oneshot::channel(); +                                    let _ = self.reader_handle.send(ReadControl::Abort(send)).await; +                                    if let Ok(state) = recv.await { +                                        read_state = state; +                                    } else { +                                        break +                                    } +                                }, +                                State::Read(read) => { +                                    read_state = read; +                                    let (send, recv) = oneshot::channel(); +                                    let _ = self.writer_handle.send(WriteControl::Abort(send)).await; +                                    // TODO: need a tokio select, in case the state arrives from somewhere else +                                    if let Ok(state) = recv.await { +                                        write_state = state; +                                    } else { +                                        break +                                    } +                                }, +                            } + +                            let mut jid = self.jid.lock().await; +                            let mut domain = jid.domainpart.clone(); +                            let connection = jabber::connect_and_login(&mut jid, &*self.password, &mut domain).await; +                            match connection { +                                Ok(c) => { + +                                    let (read, write) = c.split(); +                                    let (send, recv) = oneshot::channel(); +                                    self.writer_crash = recv; +                                    self.writer_handle = +                                        WriteControlHandle::reconnect(write, send, write_state); +                                    let (send, recv) = oneshot::channel(); +                                    self.reader_crash = recv; +                                    let (db, update_sender, tasks, supervisor_command, write_sender, pending_iqs) = read_state; +                                    self.reader_handle = ReadControlHandle::reconnect( +                                        read, +                                        send, +                                        db, +                                        update_sender, +                                        supervisor_command, +                                        write_sender, +                                        tasks, +                                        pending_iqs, +                                    ); +                                }, +                                Err(e) => { +                                    // if reconnection failure, respond to all current write messages with lost connection error. the received processes should complete themselves. +                                    write_state.close(); +                                    while let Some(msg) = write_state.recv().await { +                                        let _ = msg.respond_to.send(Err(Error::LostConnection)); +                                    } +                                    let _ = self.sender.send(UpdateMessage::Error(e.into())).await; +                                    break; +                                }, +                            }                          },                      }                  }, diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs index 3c61780..c2828ad 100644 --- a/luz/src/connection/read.rs +++ b/luz/src/connection/read.rs @@ -21,7 +21,6 @@ use super::{  };  pub struct Read { -    // TODO: place iq hashmap here      control_receiver: mpsc::Receiver<ReadControl>,      stream: BoundJabberReader<Tls>,      on_crash: oneshot::Sender<( diff --git a/luz/src/lib.rs b/luz/src/lib.rs index 4d36a81..4c5a841 100644 --- a/luz/src/lib.rs +++ b/luz/src/lib.rs @@ -138,7 +138,6 @@ impl Luz {                                      self.jid.clone(),                                      self.db.clone(),                                      self.sender.clone(), -                                    // TODO: iq hashmap                                      self.pending_iqs.clone()                                  )),                                  None => self.tasks.spawn(msg.handle_offline( | 
