aboutsummaryrefslogblamecommitdiffstats
path: root/lampada/src/connection/read.rs
blob: cc69387b440ec49f7a0ef2f4f9169c02c9ed84eb (plain) (tree)
1
2
3
4
5
6
7
8
9
          
                         
                        
                           
                 
              


                   
                                                                           

                           
                                 

                                
                  

                              
 
                                                                     
 
              
                      
                                   

                                               






                                                                                                                                        
               











                                                                                                                 

 
                     
           
                                       
                           
                             
                   


                                                      
               
                                               
              
                   

                                      





                               

         
 
 
                                                     
                            


                                                           

                            
                                                                                  

                                                                          
                                                 

                          

                                                             










                                                                                  
                                                                                                                              

                                  

                      


                                                     
                                  
                                                                                                                                           

                                   
                                                       






















                                                                                                                                                                                      
                                                                                                                                         


                                  
                     


                             

                                         
                                    


     




                                 
 
                      
               
                                      



                                      














                                                  


                        
                                                    
                                       
                             
                   

                                             



                                                                   
                   
                           




                               








                                                                    
                                                          
                                       
                           
                             
                   

                                             



                                                                   
                   
                  




                               








                                                                    
use std::{
    collections::HashMap,
    marker::PhantomData,
    ops::{Deref, DerefMut},
    str::FromStr,
    sync::Arc,
    time::Duration,
};

use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use stanza::client::Stanza;
use tokio::{
    sync::{mpsc, oneshot, Mutex},
    task::{JoinHandle, JoinSet},
};
use tracing::info;

use crate::{Connected, Logic};

use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};

/// read actor
pub struct Read<Lgc> {
    stream: BoundJabberReader<Tls>,
    disconnecting: bool,
    disconnect_timedout: oneshot::Receiver<()>,

    // all the threads spawned by the current connection session
    tasks: JoinSet<()>,

    // for handling incoming stanzas
    // jabber server must be able to both terminate the connection from error, and ask for data from the client (such as supported XEPs)
    connected: Connected,
    logic: Lgc,
    supervisor_control: SupervisorSender,

    // control stuff
    control_receiver: mpsc::Receiver<ReadControl>,
    on_crash: oneshot::Sender<ReadState>,
}

/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
pub struct ReadState {
    pub supervisor_control: SupervisorSender,
    // TODO: when a stream dies, the iq gets from the server should not be replied to on the new stream
    pub tasks: JoinSet<()>,
}

impl<Lgc> Read<Lgc> {
    fn new(
        stream: BoundJabberReader<Tls>,
        tasks: JoinSet<()>,
        connected: Connected,
        logic: Lgc,
        supervisor_control: SupervisorSender,
        control_receiver: mpsc::Receiver<ReadControl>,
        on_crash: oneshot::Sender<ReadState>,
    ) -> Self {
        let (_send, recv) = oneshot::channel();
        Self {
            stream,
            disconnecting: false,
            disconnect_timedout: recv,
            tasks,
            connected,
            logic,
            supervisor_control,
            control_receiver,
            on_crash,
        }
    }
}

impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
    async fn run(mut self) {
        println!("started read thread");
        // let stanza = self.stream.read::<Stanza>().await;
        // println!("{:?}", stanza);
        loop {
            tokio::select! {
                // if still haven't received the end tag in time, just kill itself
                // TODO: is this okay??? what if notification thread dies?
                Ok(()) = &mut self.disconnect_timedout => {
                    info!("disconnect_timedout");
                    break;
                }
                Some(msg) = self.control_receiver.recv() => {
                    match msg {
                        // when disconnect received,
                        ReadControl::Disconnect => {
                            let (send, recv) = oneshot::channel();
                            self.disconnect_timedout = recv;
                            self.disconnecting = true;
                            tokio::spawn(async {
                                tokio::time::sleep(Duration::from_secs(10)).await;
                                let _ = send.send(());
                            })
                        },
                        ReadControl::Abort(sender) => {
                            let _ = sender.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
                            break;
                        },
                    };
                },
                s = self.stream.read::<Stanza>() => {
                    println!("read stanza");
                    match s {
                        Ok(s) => {
                            self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
                        },
                        Err(e) => {
                            println!("error: {:?}", e);
                            // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
                            // match e {
                            //     peanuts::Error::ReadError(error) => todo!(),
                            //     peanuts::Error::Utf8Error(utf8_error) => todo!(),
                            //     peanuts::Error::ParseError(_) => todo!(),
                            //     peanuts::Error::EntityProcessError(_) => todo!(),
                            //     peanuts::Error::InvalidCharRef(_) => todo!(),
                            //     peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
                            //     peanuts::Error::DuplicateAttribute(_) => todo!(),
                            //     peanuts::Error::UnqualifiedNamespace(_) => todo!(),
                            //     peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
                            //     peanuts::Error::NotInElement(_) => todo!(),
                            //     peanuts::Error::ExtraData(_) => todo!(),
                            //     peanuts::Error::UndeclaredNamespace(_) => todo!(),
                            //     peanuts::Error::IncorrectName(name) => todo!(),
                            //     peanuts::Error::DeserializeError(_) => todo!(),
                            //     peanuts::Error::Deserialize(deserialize_error) => todo!(),
                            //     peanuts::Error::RootElementEnded => todo!(),
                            // }
                            // TODO: make sure this only happens when an end tag is received
                            if self.disconnecting == true {
                                break;
                            } else {
                                let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
                            }
                            break;
                        },
                    }
                },
                else => break
            }
        }
        println!("stopping read thread");
        self.logic.on_abort().await;
    }
}

// what do stanza processes do?
// - update ui
// - access database
// - disconnect proper, reconnect
// - respond to server requests

pub enum ReadControl {
    Disconnect,
    Abort(oneshot::Sender<ReadState>),
}

pub struct ReadControlHandle {
    sender: mpsc::Sender<ReadControl>,
    pub(crate) handle: JoinHandle<()>,
}

impl Deref for ReadControlHandle {
    type Target = mpsc::Sender<ReadControl>;

    fn deref(&self) -> &Self::Target {
        &self.sender
    }
}

impl DerefMut for ReadControlHandle {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.sender
    }
}

impl ReadControlHandle {
    pub fn new<Lgc: Clone + Logic + Send + 'static>(
        stream: BoundJabberReader<Tls>,
        connected: Connected,
        logic: Lgc,
        supervisor_control: SupervisorSender,
        on_crash: oneshot::Sender<ReadState>,
    ) -> Self {
        let (control_sender, control_receiver) = mpsc::channel(20);

        let actor = Read::new(
            stream,
            JoinSet::new(),
            connected,
            logic,
            supervisor_control,
            control_receiver,
            on_crash,
        );
        let handle = tokio::spawn(async move { actor.run().await });

        Self {
            sender: control_sender,
            handle,
        }
    }

    pub fn reconnect<Lgc: Clone + Logic + Send + 'static>(
        stream: BoundJabberReader<Tls>,
        tasks: JoinSet<()>,
        connected: Connected,
        logic: Lgc,
        supervisor_control: SupervisorSender,
        on_crash: oneshot::Sender<ReadState>,
    ) -> Self {
        let (control_sender, control_receiver) = mpsc::channel(20);

        let actor = Read::new(
            stream,
            tasks,
            connected,
            logic,
            supervisor_control,
            control_receiver,
            on_crash,
        );
        let handle = tokio::spawn(async move { actor.run().await });

        Self {
            sender: control_sender,
            handle,
        }
    }
}