diff options
Diffstat (limited to 'lampada/src')
| -rw-r--r-- | lampada/src/connection/mod.rs | 21 | ||||
| -rw-r--r-- | lampada/src/connection/read.rs | 28 | ||||
| -rw-r--r-- | lampada/src/connection/write.rs | 33 | 
3 files changed, 49 insertions, 33 deletions
| diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs index 2d570ae..a3dde16 100644 --- a/lampada/src/connection/mod.rs +++ b/lampada/src/connection/mod.rs @@ -8,7 +8,7 @@ use std::{  };  use jid::JID; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream}; +use luz::jabber_stream::bound_stream::BoundJabberStream;  use read::{ReadControl, ReadControlHandle, ReadState};  use stanza::{client::Stanza, stream_error::Error as StreamError};  use tokio::{ @@ -86,13 +86,13 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {                              info!("sent disconnect command");                              tokio::select! {                                  _ = async { tokio::join!( -                                    async { let _ = (&mut self.write_control_handle.handle).await; }, -                                    async { let _ = (&mut self.read_control_handle.handle).await; } +                                    // async { let _ = (&mut self.write_control_handle.handle).await; }, +                                    // async { let _ = (&mut self.read_control_handle.handle).await; }                                  ) } => {},                                  // TODO: config timeout                                  _ = async { tokio::time::sleep(Duration::from_secs(5)) } => { -                                    (&mut self.read_control_handle.handle).abort(); -                                    (&mut self.write_control_handle.handle).abort(); +                                    // (&mut self.read_control_handle.handle).abort(); +                                    // (&mut self.write_control_handle.handle).abort();                                  }                              }                              info!("disconnected"); @@ -257,7 +257,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {  pub struct SupervisorHandle {      sender: SupervisorSender, -    handle: JoinHandle<()>, +    // TODO: is not having handles fine? +    // handle: JoinHandle<()>,  }  impl Deref for SupervisorHandle { @@ -295,7 +296,7 @@ impl DerefMut for SupervisorSender {  impl SupervisorHandle {      pub fn new<Lgc: Logic + Clone + Send + 'static>( -        streams: BoundJabberStream<Tls>, +        streams: BoundJabberStream,          on_crash: oneshot::Sender<()>,          jid: JID,          server: JID, @@ -341,13 +342,15 @@ impl SupervisorHandle {              logic,          ); -        let handle = tokio::spawn(async move { actor.run().await }); +        #[cfg(target_arch = "wasm32")] +        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); +        #[cfg(not(target_arch = "wasm32"))] +        tokio::spawn(async move { actor.run().await });          (              write_handle,              Self {                  sender: supervisor_sender, -                handle,              },          )      } diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs index 2c7eb58..4451d99 100644 --- a/lampada/src/connection/read.rs +++ b/lampada/src/connection/read.rs @@ -8,7 +8,7 @@ use std::{  };  use futures::{future::Fuse, FutureExt}; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader}; +use luz::jabber_stream::bound_stream::BoundJabberReader;  use stanza::client::Stanza;  use stanza::stream::Error as StreamErrorStanza;  use stanza::stream_error::Error as StreamError; @@ -24,7 +24,7 @@ use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};  /// read actor  pub struct Read<Lgc> { -    stream: BoundJabberReader<Tls>, +    stream: BoundJabberReader,      disconnecting: bool,      disconnect_timedout: Fuse<oneshot::Receiver<()>>, @@ -51,7 +51,7 @@ pub struct ReadState {  impl<Lgc> Read<Lgc> {      fn new( -        stream: BoundJabberReader<Tls>, +        stream: BoundJabberReader,          tasks: JoinSet<()>,          connected: Connected,          logic: Lgc, @@ -129,7 +129,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {                                  let stream_error = match e {                                      peanuts::Error::ReadError(error) => None,                                      peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding), -                                    peanuts::Error::ParseError(_) => Some(StreamError::BadFormat), +                                    peanuts::Error::ParseError(_, _) => Some(StreamError::BadFormat),                                      peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml),                                      peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding),                                      peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed), @@ -140,6 +140,7 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {                                      peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace),                                      peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml),                                      peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml), +                                    _ => None,                                  };                                  let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks })); @@ -169,7 +170,8 @@ pub enum ReadControl {  pub struct ReadControlHandle {      sender: mpsc::Sender<ReadControl>, -    pub(crate) handle: JoinHandle<()>, +    // TODO: same here +    // pub(crate) handle: JoinHandle<()>,  }  impl Deref for ReadControlHandle { @@ -188,7 +190,7 @@ impl DerefMut for ReadControlHandle {  impl ReadControlHandle {      pub fn new<Lgc: Clone + Logic + Send + 'static>( -        stream: BoundJabberReader<Tls>, +        stream: BoundJabberReader,          connected: Connected,          logic: Lgc,          supervisor_control: SupervisorSender, @@ -205,16 +207,18 @@ impl ReadControlHandle {              control_receiver,              on_crash,          ); -        let handle = tokio::spawn(async move { actor.run().await }); +        #[cfg(target_arch = "wasm32")] +        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); +        #[cfg(not(target_arch = "wasm32"))] +        tokio::spawn(async move { actor.run().await });          Self {              sender: control_sender, -            handle,          }      }      pub fn reconnect<Lgc: Clone + Logic + Send + 'static>( -        stream: BoundJabberReader<Tls>, +        stream: BoundJabberReader,          tasks: JoinSet<()>,          connected: Connected,          logic: Lgc, @@ -232,11 +236,13 @@ impl ReadControlHandle {              control_receiver,              on_crash,          ); -        let handle = tokio::spawn(async move { actor.run().await }); +        #[cfg(target_arch = "wasm32")] +        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); +        #[cfg(not(target_arch = "wasm32"))] +        tokio::spawn(async move { actor.run().await });          Self {              sender: control_sender, -            handle,          }      }  } diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs index 1070cdf..4c6ed24 100644 --- a/lampada/src/connection/write.rs +++ b/lampada/src/connection/write.rs @@ -1,6 +1,6 @@  use std::ops::{Deref, DerefMut}; -use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter}; +use luz::jabber_stream::bound_stream::BoundJabberWriter;  use stanza::{      client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError,  }; @@ -13,7 +13,7 @@ use crate::error::WriteError;  /// actor that receives jabber stanzas to write, and if there is an error, sends a message back to the supervisor then aborts, so the supervisor can spawn a new stream.  pub struct Write { -    stream: BoundJabberWriter<Tls>, +    stream: BoundJabberWriter,      /// connection session write queue      stanza_receiver: mpsc::Receiver<WriteMessage>, @@ -41,7 +41,7 @@ pub enum WriteControl {  impl Write {      fn new( -        stream: BoundJabberWriter<Tls>, +        stream: BoundJabberWriter,          stanza_receiver: mpsc::Receiver<WriteMessage>,          control_receiver: mpsc::Receiver<WriteControl>,          on_crash: oneshot::Sender<(WriteMessage, WriteState)>, @@ -192,7 +192,7 @@ impl DerefMut for WriteHandle {  pub struct WriteControlHandle {      sender: mpsc::Sender<WriteControl>, -    pub(crate) handle: JoinHandle<()>, +    // pub(crate) handle: JoinHandle<()>,  }  impl Deref for WriteControlHandle { @@ -211,14 +211,17 @@ impl DerefMut for WriteControlHandle {  impl WriteControlHandle {      pub fn new( -        stream: BoundJabberWriter<Tls>, +        stream: BoundJabberWriter,          on_crash: oneshot::Sender<(WriteMessage, WriteState)>,      ) -> (WriteHandle, Self) {          let (control_sender, control_receiver) = mpsc::channel(20);          let (stanza_sender, stanza_receiver) = mpsc::channel(20);          let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); -        let handle = tokio::spawn(async move { actor.run().await }); +        #[cfg(target_arch = "wasm32")] +        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); +        #[cfg(not(target_arch = "wasm32"))] +        tokio::spawn(async move { actor.run().await });          (              WriteHandle { @@ -226,13 +229,12 @@ impl WriteControlHandle {              },              Self {                  sender: control_sender, -                handle,              },          )      }      pub fn reconnect_retry( -        stream: BoundJabberWriter<Tls>, +        stream: BoundJabberWriter,          on_crash: oneshot::Sender<(WriteMessage, WriteState)>,          stanza_receiver: mpsc::Receiver<WriteMessage>,          retry_msg: WriteMessage, @@ -240,27 +242,32 @@ impl WriteControlHandle {          let (control_sender, control_receiver) = mpsc::channel(20);          let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); -        let handle = tokio::spawn(async move { actor.run_reconnected(retry_msg).await }); +        #[cfg(target_arch = "wasm32")] +        wasm_bindgen_futures::spawn_local(async move { actor.run_reconnected(retry_msg).await }); +        #[cfg(not(target_arch = "wasm32"))] +        tokio::spawn(async move { actor.run_reconnected(retry_msg).await });          Self {              sender: control_sender, -            handle,          }      }      pub fn reconnect( -        stream: BoundJabberWriter<Tls>, +        stream: BoundJabberWriter,          on_crash: oneshot::Sender<(WriteMessage, WriteState)>,          stanza_receiver: mpsc::Receiver<WriteMessage>,      ) -> Self {          let (control_sender, control_receiver) = mpsc::channel(20);          let actor = Write::new(stream, stanza_receiver, control_receiver, on_crash); -        let handle = tokio::spawn(async move { actor.run().await }); + +        #[cfg(target_arch = "wasm32")] +        wasm_bindgen_futures::spawn_local(async move { actor.run().await }); +        #[cfg(not(target_arch = "wasm32"))] +        tokio::spawn(async move { actor.run().await });          Self {              sender: control_sender, -            handle,          }      }  } | 
