diff options
| author | 2025-03-26 15:29:11 +0000 | |
|---|---|---|
| committer | 2025-03-26 15:29:11 +0000 | |
| commit | bf677e1f9ce07e2fa8971c15b9a082cddbb40dec (patch) | |
| tree | d8a6c8bec63c774322b207af43ad573d730ee1a2 /filamento | |
| parent | 2211f324782cdc617b4b5ecd071178e372539fe4 (diff) | |
| download | luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.tar.gz luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.tar.bz2 luz-bf677e1f9ce07e2fa8971c15b9a082cddbb40dec.zip | |
refactor(filament): split logic into different files
Diffstat (limited to '')
| -rw-r--r-- | filamento/.gitignore | 1 | ||||
| -rw-r--r-- | filamento/src/lib.rs | 1124 | ||||
| -rw-r--r-- | filamento/src/logic/abort.rs | 10 | ||||
| -rw-r--r-- | filamento/src/logic/connect.rs | 91 | ||||
| -rw-r--r-- | filamento/src/logic/connection_error.rs | 12 | ||||
| -rw-r--r-- | filamento/src/logic/disconnect.rs | 18 | ||||
| -rw-r--r-- | filamento/src/logic/mod.rs | 90 | ||||
| -rw-r--r-- | filamento/src/logic/offline.rs | 110 | ||||
| -rw-r--r-- | filamento/src/logic/online.rs | 661 | ||||
| -rw-r--r-- | filamento/src/logic/process_stanza.rs | 264 | 
10 files changed, 1260 insertions, 1121 deletions
| diff --git a/filamento/.gitignore b/filamento/.gitignore new file mode 100644 index 0000000..8bb4037 --- /dev/null +++ b/filamento/.gitignore @@ -0,0 +1 @@ +filamento.db diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs index db59a67..030dc43 100644 --- a/filamento/src/lib.rs +++ b/filamento/src/lib.rs @@ -19,6 +19,7 @@ use lampada::{      Connected, CoreClient, CoreClientCommand, Logic, SupervisorSender, WriteMessage,      error::{ActorError, CommandError, ConnectionError, ReadError, WriteError},  }; +use logic::ClientLogic;  use presence::{Offline, Online, Presence, PresenceType, Show};  use roster::{Contact, ContactUpdate};  use stanza::client::{ @@ -36,6 +37,7 @@ use uuid::Uuid;  pub mod chat;  pub mod db;  pub mod error; +mod logic;  pub mod presence;  pub mod roster;  pub mod user; @@ -145,11 +147,7 @@ impl Client {          let (_sup_send, sup_recv) = oneshot::channel();          let sup_recv = sup_recv.fuse(); -        let logic = ClientLogic { -            db, -            pending: Arc::new(Mutex::new(HashMap::new())), -            update_sender: update_send, -        }; +        let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send);          let actor: CoreClient<ClientLogic> =              CoreClient::new(jid, password, command_receiver, None, sup_recv, logic); @@ -450,1122 +448,6 @@ impl Client {      }  } -#[derive(Clone)] -pub struct ClientLogic { -    db: Db, -    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, -    update_sender: mpsc::Sender<UpdateMessage>, -} - -impl Logic for ClientLogic { -    type Cmd = Command; - -    async fn handle_connect(self, connection: Connected) { -        let (send, recv) = oneshot::channel(); -        debug!("getting roster"); -        self.clone() -            .handle_online(Command::GetRoster(send), connection.clone()) -            .await; -        debug!("sent roster req"); -        let roster = recv.await; -        debug!("got roster"); -        match roster { -            Ok(r) => match r { -                Ok(roster) => { -                    let online = self.db.read_cached_status().await; -                    let online = match online { -                        Ok(online) => online, -                        Err(e) => { -                            let _ = self -                                .update_sender -                                .send(UpdateMessage::Error(Error::Connecting( -                                    ConnectionJobError::StatusCacheError(e.into()), -                                ))) -                                .await; -                            Online::default() -                        } -                    }; -                    let (send, recv) = oneshot::channel(); -                    self.clone() -                        .handle_online( -                            Command::SendPresence(None, PresenceType::Online(online.clone()), send), -                            connection, -                        ) -                        .await; -                    let set_status = recv.await; -                    match set_status { -                        Ok(s) => match s { -                            Ok(()) => { -                                let _ = self -                                    .update_sender -                                    .send(UpdateMessage::Online(online, roster)) -                                    .await; -                            } -                            Err(e) => { -                                let _ = self -                                    .update_sender -                                    .send(UpdateMessage::Error(Error::Connecting(e.into()))) -                                    .await; -                            } -                        }, -                        Err(e) => { -                            let _ = self -                                .update_sender -                                .send(UpdateMessage::Error(Error::Connecting( -                                    ConnectionJobError::SendPresence(WriteError::Actor(e.into())), -                                ))) -                                .await; -                        } -                    } -                } -                Err(e) => { -                    let _ = self -                        .update_sender -                        .send(UpdateMessage::Error(Error::Connecting(e.into()))) -                        .await; -                } -            }, -            Err(e) => { -                let _ = self -                    .update_sender -                    .send(UpdateMessage::Error(Error::Connecting( -                        ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( -                            e.into(), -                        ))), -                    ))) -                    .await; -            } -        } -    } - -    async fn handle_disconnect(self, connection: Connected) { -        // TODO: be able to set offline status message -        let offline_presence: stanza::client::presence::Presence = -            Offline::default().into_stanza(None); -        let stanza = Stanza::Presence(offline_presence); -        // TODO: timeout and error check -        connection.write_handle().write(stanza).await; -        let _ = self -            .update_sender -            .send(UpdateMessage::Offline(Offline::default())) -            .await; -    } - -    async fn handle_stanza( -        self, -        stanza: Stanza, -        connection: Connected, -        supervisor: SupervisorSender, -    ) { -        match stanza { -            Stanza::Message(stanza_message) => { -                if let Some(mut from) = stanza_message.from { -                    // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. -                    let timestamp = stanza_message -                        .delay -                        .map(|delay| delay.stamp) -                        .unwrap_or_else(|| Utc::now()); -                    // TODO: group chat messages -                    let mut message = Message { -                        id: stanza_message -                            .id -                            // TODO: proper id storage -                            .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) -                            .unwrap_or_else(|| Uuid::new_v4()), -                        from: from.clone(), -                        timestamp, -                        body: Body { -                            // TODO: should this be an option? -                            body: stanza_message -                                .body -                                .map(|body| body.body) -                                .unwrap_or_default() -                                .unwrap_or_default(), -                        }, -                    }; -                    // TODO: can this be more efficient? -                    let result = self -                        .db -                        .create_message_with_user_resource_and_chat(message.clone(), from.clone()) -                        .await; -                    if let Err(e) = result { -                        tracing::error!("messagecreate"); -                        let _ = self -                            .update_sender -                            .send(UpdateMessage::Error(Error::MessageRecv( -                                MessageRecvError::MessageHistory(e.into()), -                            ))) -                            .await; -                    } -                    message.from = message.from.as_bare(); -                    from = from.as_bare(); -                    let _ = self -                        .update_sender -                        .send(UpdateMessage::Message { to: from, message }) -                        .await; -                } else { -                    let _ = self -                        .update_sender -                        .send(UpdateMessage::Error(Error::MessageRecv( -                            MessageRecvError::MissingFrom, -                        ))) -                        .await; -                } -            } -            Stanza::Presence(presence) => { -                if let Some(from) = presence.from { -                    match presence.r#type { -                        Some(r#type) => match r#type { -                            // error processing a presence from somebody -                            stanza::client::presence::PresenceType::Error => { -                                // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. -                                let _ = self -                                    .update_sender -                                    .send(UpdateMessage::Error(Error::Presence( -                                        // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display -                                        PresenceError::StanzaError( -                                            presence -                                                .errors -                                                .first() -                                                .cloned() -                                                .expect("error MUST have error"), -                                        ), -                                    ))) -                                    .await; -                            } -                            // should not happen (error to server) -                            stanza::client::presence::PresenceType::Probe => { -                                // TODO: should probably write an error and restart stream -                                let _ = self -                                    .update_sender -                                    .send(UpdateMessage::Error(Error::Presence( -                                        PresenceError::Unsupported, -                                    ))) -                                    .await; -                            } -                            stanza::client::presence::PresenceType::Subscribe => { -                                // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event -                                let _ = self -                                    .update_sender -                                    .send(UpdateMessage::SubscriptionRequest(from)) -                                    .await; -                            } -                            stanza::client::presence::PresenceType::Unavailable => { -                                let offline = Offline { -                                    status: presence.status.map(|status| status.status.0), -                                }; -                                let timestamp = presence -                                    .delay -                                    .map(|delay| delay.stamp) -                                    .unwrap_or_else(|| Utc::now()); -                                let _ = self -                                    .update_sender -                                    .send(UpdateMessage::Presence { -                                        from, -                                        presence: Presence { -                                            timestamp, -                                            presence: PresenceType::Offline(offline), -                                        }, -                                    }) -                                    .await; -                            } -                            // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. -                            stanza::client::presence::PresenceType::Subscribed => {} -                            stanza::client::presence::PresenceType::Unsubscribe => {} -                            stanza::client::presence::PresenceType::Unsubscribed => {} -                        }, -                        None => { -                            let online = Online { -                                show: presence.show.map(|show| match show { -                                    stanza::client::presence::Show::Away => Show::Away, -                                    stanza::client::presence::Show::Chat => Show::Chat, -                                    stanza::client::presence::Show::Dnd => Show::DoNotDisturb, -                                    stanza::client::presence::Show::Xa => Show::ExtendedAway, -                                }), -                                status: presence.status.map(|status| status.status.0), -                                priority: presence.priority.map(|priority| priority.0), -                            }; -                            let timestamp = presence -                                .delay -                                .map(|delay| delay.stamp) -                                .unwrap_or_else(|| Utc::now()); -                            let _ = self -                                .update_sender -                                .send(UpdateMessage::Presence { -                                    from, -                                    presence: Presence { -                                        timestamp, -                                        presence: PresenceType::Online(online), -                                    }, -                                }) -                                .await; -                        } -                    } -                } else { -                    let _ = self -                        .update_sender -                        .send(UpdateMessage::Error(Error::Presence( -                            PresenceError::MissingFrom, -                        ))) -                        .await; -                } -            } -            Stanza::Iq(iq) => match iq.r#type { -                stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { -                    let send; -                    { -                        send = self.pending.lock().await.remove(&iq.id); -                    } -                    if let Some(send) = send { -                        send.send(Ok(Stanza::Iq(iq))); -                    } else { -                        let _ = self -                            .update_sender -                            .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( -                                iq.id, -                            )))) -                            .await; -                    } -                } -                // TODO: send unsupported to server -                // TODO: proper errors i am so tired please -                stanza::client::iq::IqType::Get => {} -                stanza::client::iq::IqType::Set => { -                    if let Some(query) = iq.query { -                        match query { -                            stanza::client::iq::Query::Roster(mut query) => { -                                // TODO: there should only be one -                                if let Some(item) = query.items.pop() { -                                    match item.subscription { -                                        Some(stanza::roster::Subscription::Remove) => { -                                            self.db.delete_contact(item.jid.clone()).await; -                                            self.update_sender -                                                .send(UpdateMessage::RosterDelete(item.jid)) -                                                .await; -                                            // TODO: send result -                                        } -                                        _ => { -                                            let contact: Contact = item.into(); -                                            if let Err(e) = -                                                self.db.upsert_contact(contact.clone()).await -                                            { -                                                let _ = self -                                                    .update_sender -                                                    .send(UpdateMessage::Error(Error::Roster( -                                                        RosterError::Cache(e.into()), -                                                    ))) -                                                    .await; -                                            } -                                            let _ = self -                                                .update_sender -                                                .send(UpdateMessage::RosterUpdate(contact)) -                                                .await; -                                            // TODO: send result -                                            // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { -                                            //     from: , -                                            //     id: todo!(), -                                            //     to: todo!(), -                                            //     r#type: todo!(), -                                            //     lang: todo!(), -                                            //     query: todo!(), -                                            //     errors: todo!(), -                                            // })); -                                        } -                                    } -                                } -                            } -                            // TODO: send unsupported to server -                            _ => {} -                        } -                    } else { -                        // TODO: send error (unsupported) to server -                    } -                } -            }, -            Stanza::Error(error) => { -                let _ = self -                    .update_sender -                    .send(UpdateMessage::Error(Error::Stream(error))) -                    .await; -                // TODO: reconnect -            } -            Stanza::OtherContent(content) => { -                let _ = self -                    .update_sender -                    .send(UpdateMessage::Error(Error::UnrecognizedContent)); -                // TODO: send error to write_thread -            } -        } -    } - -    async fn handle_online(self, command: Command, connection: Connected) { -        match command { -            Command::GetRoster(result_sender) => { -                // TODO: jid resource should probably be stored within the connection -                debug!("before client_jid lock"); -                debug!("after client_jid lock"); -                let iq_id = Uuid::new_v4().to_string(); -                let (send, iq_recv) = oneshot::channel(); -                { -                    self.pending.lock().await.insert(iq_id.clone(), send); -                } -                let stanza = Stanza::Iq(Iq { -                    from: Some(connection.jid().clone()), -                    id: iq_id.to_string(), -                    to: None, -                    r#type: IqType::Get, -                    lang: None, -                    query: Some(iq::Query::Roster(stanza::roster::Query { -                        ver: None, -                        items: Vec::new(), -                    })), -                    errors: Vec::new(), -                }); -                let (send, recv) = oneshot::channel(); -                let _ = connection -                    .write_handle() -                    .send(WriteMessage { -                        stanza, -                        respond_to: send, -                    }) -                    .await; -                // TODO: timeout -                match recv.await { -                    Ok(Ok(())) => info!("roster request sent"), -                    Ok(Err(e)) => { -                        // TODO: log errors if fail to send -                        let _ = result_sender.send(Err(RosterError::Write(e.into()))); -                        return; -                    } -                    Err(e) => { -                        let _ = result_sender -                            .send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                        return; -                    } -                }; -                // TODO: timeout -                match iq_recv.await { -                    Ok(Ok(stanza)) => match stanza { -                        Stanza::Iq(Iq { -                            from: _, -                            id, -                            to: _, -                            r#type, -                            lang: _, -                            query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), -                            errors: _, -                        }) if id == iq_id && r#type == IqType::Result => { -                            let contacts: Vec<Contact> = -                                items.into_iter().map(|item| item.into()).collect(); -                            if let Err(e) = self.db.replace_cached_roster(contacts.clone()).await { -                                self.update_sender -                                    .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( -                                        e.into(), -                                    )))) -                                    .await; -                            }; -                            result_sender.send(Ok(contacts)); -                            return; -                        } -                        ref s @ Stanza::Iq(Iq { -                            from: _, -                            ref id, -                            to: _, -                            r#type, -                            lang: _, -                            query: _, -                            ref errors, -                        }) if *id == iq_id && r#type == IqType::Error => { -                            if let Some(error) = errors.first() { -                                result_sender.send(Err(RosterError::StanzaError(error.clone()))); -                            } else { -                                result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                            } -                            return; -                        } -                        s => { -                            result_sender.send(Err(RosterError::UnexpectedStanza(s))); -                            return; -                        } -                    }, -                    Ok(Err(e)) => { -                        result_sender.send(Err(RosterError::Read(e))); -                        return; -                    } -                    Err(e) => { -                        result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                        return; -                    } -                } -            } -            Command::GetChats(sender) => { -                let chats = self.db.read_chats().await.map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetChatsOrdered(sender) => { -                let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetChatsOrderedWithLatestMessages(sender) => { -                let chats = self -                    .db -                    .read_chats_ordered_with_latest_messages() -                    .await -                    .map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetChat(jid, sender) => { -                let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetMessages(jid, sender) => { -                let messages = self -                    .db -                    .read_message_history(jid) -                    .await -                    .map_err(|e| e.into()); -                sender.send(messages); -            } -            Command::DeleteChat(jid, sender) => { -                let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); -                sender.send(result); -            } -            Command::DeleteMessage(uuid, sender) => { -                let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); -                sender.send(result); -            } -            Command::GetUser(jid, sender) => { -                let user = self.db.read_user(jid).await.map_err(|e| e.into()); -                sender.send(user); -            } -            // TODO: offline queue to modify roster -            Command::AddContact(jid, sender) => { -                let iq_id = Uuid::new_v4().to_string(); -                let set_stanza = Stanza::Iq(Iq { -                    from: Some(connection.jid().clone()), -                    id: iq_id.clone(), -                    to: None, -                    r#type: IqType::Set, -                    lang: None, -                    query: Some(iq::Query::Roster(stanza::roster::Query { -                        ver: None, -                        items: vec![stanza::roster::Item { -                            approved: None, -                            ask: false, -                            jid, -                            name: None, -                            subscription: None, -                            groups: Vec::new(), -                        }], -                    })), -                    errors: Vec::new(), -                }); -                let (send, recv) = oneshot::channel(); -                { -                    self.pending.lock().await.insert(iq_id.clone(), send); -                } -                // TODO: write_handle send helper function -                let result = connection.write_handle().write(set_stanza).await; -                if let Err(e) = result { -                    sender.send(Err(RosterError::Write(e))); -                    return; -                } -                let iq_result = recv.await; -                match iq_result { -                    Ok(i) => match i { -                        Ok(iq_result) => match iq_result { -                            Stanza::Iq(Iq { -                                from: _, -                                id, -                                to: _, -                                r#type, -                                lang: _, -                                query: _, -                                errors: _, -                            }) if id == iq_id && r#type == IqType::Result => { -                                sender.send(Ok(())); -                                return; -                            } -                            ref s @ Stanza::Iq(Iq { -                                from: _, -                                ref id, -                                to: _, -                                r#type, -                                lang: _, -                                query: _, -                                ref errors, -                            }) if *id == iq_id && r#type == IqType::Error => { -                                if let Some(error) = errors.first() { -                                    sender.send(Err(RosterError::StanzaError(error.clone()))); -                                } else { -                                    sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                                } -                                return; -                            } -                            s => { -                                sender.send(Err(RosterError::UnexpectedStanza(s))); -                                return; -                            } -                        }, -                        Err(e) => { -                            sender.send(Err(e.into())); -                            return; -                        } -                    }, -                    Err(e) => { -                        sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                        return; -                    } -                } -            } -            Command::BuddyRequest(jid, sender) => { -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid.clone()), -                    r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                match result { -                    Err(_) => { -                        let _ = sender.send(result); -                    } -                    Ok(()) => { -                        let presence = Stanza::Presence(stanza::client::presence::Presence { -                            from: None, -                            id: None, -                            to: Some(jid), -                            r#type: Some(stanza::client::presence::PresenceType::Subscribed), -                            lang: None, -                            show: None, -                            status: None, -                            priority: None, -                            errors: Vec::new(), -                            delay: None, -                        }); -                        let result = connection.write_handle().write(presence).await; -                        let _ = sender.send(result); -                    } -                } -            } -            Command::SubscriptionRequest(jid, sender) => { -                // TODO: i should probably have builders -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid), -                    r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                let _ = sender.send(result); -            } -            Command::AcceptBuddyRequest(jid, sender) => { -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid.clone()), -                    r#type: Some(stanza::client::presence::PresenceType::Subscribed), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                match result { -                    Err(_) => { -                        let _ = sender.send(result); -                    } -                    Ok(()) => { -                        let presence = Stanza::Presence(stanza::client::presence::Presence { -                            from: None, -                            id: None, -                            to: Some(jid), -                            r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                            lang: None, -                            show: None, -                            status: None, -                            priority: None, -                            errors: Vec::new(), -                            delay: None, -                        }); -                        let result = connection.write_handle().write(presence).await; -                        let _ = sender.send(result); -                    } -                } -            } -            Command::AcceptSubscriptionRequest(jid, sender) => { -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid), -                    r#type: Some(stanza::client::presence::PresenceType::Subscribe), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                let _ = sender.send(result); -            } -            Command::UnsubscribeFromContact(jid, sender) => { -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid), -                    r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                let _ = sender.send(result); -            } -            Command::UnsubscribeContact(jid, sender) => { -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid), -                    r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                let _ = sender.send(result); -            } -            Command::UnfriendContact(jid, sender) => { -                let presence = Stanza::Presence(stanza::client::presence::Presence { -                    from: None, -                    id: None, -                    to: Some(jid.clone()), -                    r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), -                    lang: None, -                    show: None, -                    status: None, -                    priority: None, -                    errors: Vec::new(), -                    delay: None, -                }); -                let result = connection.write_handle().write(presence).await; -                match result { -                    Err(_) => { -                        let _ = sender.send(result); -                    } -                    Ok(()) => { -                        let presence = Stanza::Presence(stanza::client::presence::Presence { -                            from: None, -                            id: None, -                            to: Some(jid), -                            r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), -                            lang: None, -                            show: None, -                            status: None, -                            priority: None, -                            errors: Vec::new(), -                            delay: None, -                        }); -                        let result = connection.write_handle().write(presence).await; -                        let _ = sender.send(result); -                    } -                } -            } -            Command::DeleteContact(jid, sender) => { -                let iq_id = Uuid::new_v4().to_string(); -                let set_stanza = Stanza::Iq(Iq { -                    from: Some(connection.jid().clone()), -                    id: iq_id.clone(), -                    to: None, -                    r#type: IqType::Set, -                    lang: None, -                    query: Some(iq::Query::Roster(stanza::roster::Query { -                        ver: None, -                        items: vec![stanza::roster::Item { -                            approved: None, -                            ask: false, -                            jid, -                            name: None, -                            subscription: Some(stanza::roster::Subscription::Remove), -                            groups: Vec::new(), -                        }], -                    })), -                    errors: Vec::new(), -                }); -                let (send, recv) = oneshot::channel(); -                { -                    self.pending.lock().await.insert(iq_id.clone(), send); -                } -                let result = connection.write_handle().write(set_stanza).await; -                if let Err(e) = result { -                    sender.send(Err(RosterError::Write(e))); -                    return; -                } -                let iq_result = recv.await; -                match iq_result { -                    Ok(i) => match i { -                        Ok(iq_result) => match iq_result { -                            Stanza::Iq(Iq { -                                from: _, -                                id, -                                to: _, -                                r#type, -                                lang: _, -                                query: _, -                                errors: _, -                            }) if id == iq_id && r#type == IqType::Result => { -                                sender.send(Ok(())); -                                return; -                            } -                            ref s @ Stanza::Iq(Iq { -                                from: _, -                                ref id, -                                to: _, -                                r#type, -                                lang: _, -                                query: _, -                                ref errors, -                            }) if *id == iq_id && r#type == IqType::Error => { -                                if let Some(error) = errors.first() { -                                    sender.send(Err(RosterError::StanzaError(error.clone()))); -                                } else { -                                    sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                                } -                                return; -                            } -                            s => { -                                sender.send(Err(RosterError::UnexpectedStanza(s))); -                                return; -                            } -                        }, -                        Err(e) => { -                            sender.send(Err(e.into())); -                            return; -                        } -                    }, -                    Err(e) => { -                        sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                        return; -                    } -                } -            } -            Command::UpdateContact(jid, contact_update, sender) => { -                let iq_id = Uuid::new_v4().to_string(); -                let groups = Vec::from_iter( -                    contact_update -                        .groups -                        .into_iter() -                        .map(|group| stanza::roster::Group(Some(group))), -                ); -                let set_stanza = Stanza::Iq(Iq { -                    from: Some(connection.jid().clone()), -                    id: iq_id.clone(), -                    to: None, -                    r#type: IqType::Set, -                    lang: None, -                    query: Some(iq::Query::Roster(stanza::roster::Query { -                        ver: None, -                        items: vec![stanza::roster::Item { -                            approved: None, -                            ask: false, -                            jid, -                            name: contact_update.name, -                            subscription: None, -                            groups, -                        }], -                    })), -                    errors: Vec::new(), -                }); -                let (send, recv) = oneshot::channel(); -                { -                    self.pending.lock().await.insert(iq_id.clone(), send); -                } -                let result = connection.write_handle().write(set_stanza).await; -                if let Err(e) = result { -                    sender.send(Err(RosterError::Write(e))); -                    return; -                } -                let iq_result = recv.await; -                match iq_result { -                    Ok(i) => match i { -                        Ok(iq_result) => match iq_result { -                            Stanza::Iq(Iq { -                                from: _, -                                id, -                                to: _, -                                r#type, -                                lang: _, -                                query: _, -                                errors: _, -                            }) if id == iq_id && r#type == IqType::Result => { -                                sender.send(Ok(())); -                                return; -                            } -                            ref s @ Stanza::Iq(Iq { -                                from: _, -                                ref id, -                                to: _, -                                r#type, -                                lang: _, -                                query: _, -                                ref errors, -                            }) if *id == iq_id && r#type == IqType::Error => { -                                if let Some(error) = errors.first() { -                                    sender.send(Err(RosterError::StanzaError(error.clone()))); -                                } else { -                                    sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); -                                } -                                return; -                            } -                            s => { -                                sender.send(Err(RosterError::UnexpectedStanza(s))); -                                return; -                            } -                        }, -                        Err(e) => { -                            sender.send(Err(e.into())); -                            return; -                        } -                    }, -                    Err(e) => { -                        sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); -                        return; -                    } -                } -            } -            Command::SetStatus(online, sender) => { -                let result = self.db.upsert_cached_status(online.clone()).await; -                if let Err(e) = result { -                    let _ = self -                        .update_sender -                        .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( -                            e.into(), -                        )))) -                        .await; -                } -                let result = connection -                    .write_handle() -                    .write(Stanza::Presence(online.into_stanza(None))) -                    .await -                    .map_err(|e| StatusError::Write(e)); -                // .map_err(|e| StatusError::Write(e)); -                let _ = sender.send(result); -            } -            // TODO: offline message queue -            Command::SendMessage(jid, body, sender) => { -                let id = Uuid::new_v4(); -                let message = Stanza::Message(stanza::client::message::Message { -                    from: Some(connection.jid().clone()), -                    id: Some(id.to_string()), -                    to: Some(jid.clone()), -                    // TODO: specify message type -                    r#type: stanza::client::message::MessageType::Chat, -                    // TODO: lang ? -                    lang: None, -                    subject: None, -                    body: Some(stanza::client::message::Body { -                        lang: None, -                        body: Some(body.body.clone()), -                    }), -                    thread: None, -                    delay: None, -                }); -                let _ = sender.send(Ok(())); -                // let _ = sender.send(Ok(message.clone())); -                let result = connection.write_handle().write(message).await; -                match result { -                    Ok(_) => { -                        let mut message = Message { -                            id, -                            from: connection.jid().clone(), -                            body, -                            timestamp: Utc::now(), -                        }; -                        info!("send message {:?}", message); -                        if let Err(e) = self -                            .db -                            .create_message_with_self_resource_and_chat( -                                message.clone(), -                                jid.clone(), -                            ) -                            .await -                            .map_err(|e| e.into()) -                        { -                            tracing::error!("{}", e); -                            let _ = -                                self.update_sender -                                    .send(UpdateMessage::Error(Error::MessageSend( -                                        error::MessageSendError::MessageHistory(e), -                                    ))); -                        } -                        // TODO: don't do this, have separate from from details -                        message.from = message.from.as_bare(); -                        let _ = self -                            .update_sender -                            .send(UpdateMessage::Message { to: jid, message }) -                            .await; -                    } -                    Err(_) => { -                        // let _ = sender.send(result); -                    } -                } -            } -            Command::SendPresence(jid, presence, sender) => { -                let mut presence: stanza::client::presence::Presence = presence.into(); -                if let Some(jid) = jid { -                    presence.to = Some(jid); -                }; -                let result = connection -                    .write_handle() -                    .write(Stanza::Presence(presence)) -                    .await; -                // .map_err(|e| StatusError::Write(e)); -                let _ = sender.send(result); -            } -        } -    } - -    async fn handle_offline(self, command: Command) { -        match command { -            Command::GetRoster(sender) => { -                let roster = self.db.read_cached_roster().await; -                match roster { -                    Ok(roster) => { -                        let _ = sender.send(Ok(roster)); -                    } -                    Err(e) => { -                        let _ = sender.send(Err(RosterError::Cache(e.into()))); -                    } -                } -            } -            Command::GetChats(sender) => { -                let chats = self.db.read_chats().await.map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetChatsOrdered(sender) => { -                let chats = self.db.read_chats_ordered().await.map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetChatsOrderedWithLatestMessages(sender) => { -                let chats = self -                    .db -                    .read_chats_ordered_with_latest_messages() -                    .await -                    .map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetChat(jid, sender) => { -                let chats = self.db.read_chat(jid).await.map_err(|e| e.into()); -                sender.send(chats); -            } -            Command::GetMessages(jid, sender) => { -                let messages = self -                    .db -                    .read_message_history(jid) -                    .await -                    .map_err(|e| e.into()); -                sender.send(messages); -            } -            Command::DeleteChat(jid, sender) => { -                let result = self.db.delete_chat(jid).await.map_err(|e| e.into()); -                sender.send(result); -            } -            Command::DeleteMessage(uuid, sender) => { -                let result = self.db.delete_message(uuid).await.map_err(|e| e.into()); -                sender.send(result); -            } -            Command::GetUser(jid, sender) => { -                let user = self.db.read_user(jid).await.map_err(|e| e.into()); -                sender.send(user); -            } -            // TODO: offline queue to modify roster -            Command::AddContact(_jid, sender) => { -                sender.send(Err(RosterError::Write(WriteError::Disconnected))); -            } -            Command::BuddyRequest(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::SubscriptionRequest(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::AcceptBuddyRequest(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::AcceptSubscriptionRequest(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::UnsubscribeFromContact(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::UnsubscribeContact(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::UnfriendContact(_jid, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::DeleteContact(_jid, sender) => { -                sender.send(Err(RosterError::Write(WriteError::Disconnected))); -            } -            Command::UpdateContact(_jid, _contact_update, sender) => { -                sender.send(Err(RosterError::Write(WriteError::Disconnected))); -            } -            Command::SetStatus(online, sender) => { -                let result = self -                    .db -                    .upsert_cached_status(online) -                    .await -                    .map_err(|e| StatusError::Cache(e.into())); -                sender.send(result); -            } -            // TODO: offline message queue -            Command::SendMessage(_jid, _body, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -            Command::SendPresence(_jid, _presence, sender) => { -                sender.send(Err(WriteError::Disconnected)); -            } -        } -    } -    // pub async fn handle_stream_error(self, error) {} -    // stanza errors (recoverable) -    // pub async fn handle_error(self, error: Error) {} -    // when it aborts, must clear iq map no matter what -    async fn on_abort(self) { -        let mut iqs = self.pending.lock().await; -        for (_id, sender) in iqs.drain() { -            let _ = sender.send(Err(ReadError::LostConnection)); -        } -    } - -    async fn handle_connection_error(self, error: ConnectionError) { -        self.update_sender -            .send(UpdateMessage::Error( -                ConnectionError::AlreadyConnected.into(), -            )) -            .await; -    } -} -  impl From<Command> for CoreClientCommand<Command> {      fn from(value: Command) -> Self {          CoreClientCommand::Command(value) diff --git a/filamento/src/logic/abort.rs b/filamento/src/logic/abort.rs new file mode 100644 index 0000000..32c4823 --- /dev/null +++ b/filamento/src/logic/abort.rs @@ -0,0 +1,10 @@ +use lampada::error::ReadError; + +use super::ClientLogic; + +pub async fn on_abort(logic: ClientLogic) { +    let mut iqs = logic.pending().lock().await; +    for (_id, sender) in iqs.drain() { +        let _ = sender.send(Err(ReadError::LostConnection)); +    } +} diff --git a/filamento/src/logic/connect.rs b/filamento/src/logic/connect.rs new file mode 100644 index 0000000..4dc789e --- /dev/null +++ b/filamento/src/logic/connect.rs @@ -0,0 +1,91 @@ +use lampada::{Connected, Logic, error::WriteError}; +use tokio::sync::oneshot; +use tracing::debug; + +use crate::{ +    Command, UpdateMessage, +    error::{ConnectionJobError, Error, RosterError}, +    presence::{Online, PresenceType}, +}; + +use super::ClientLogic; + +pub async fn handle_connect(logic: ClientLogic, connection: Connected) { +    let (send, recv) = oneshot::channel(); +    debug!("getting roster"); +    logic +        .clone() +        .handle_online(Command::GetRoster(send), connection.clone()) +        .await; +    debug!("sent roster req"); +    let roster = recv.await; +    debug!("got roster"); +    match roster { +        Ok(r) => match r { +            Ok(roster) => { +                let online = logic.db().read_cached_status().await; +                let online = match online { +                    Ok(online) => online, +                    Err(e) => { +                        let _ = logic +                            .update_sender() +                            .send(UpdateMessage::Error(Error::Connecting( +                                ConnectionJobError::StatusCacheError(e.into()), +                            ))) +                            .await; +                        Online::default() +                    } +                }; +                let (send, recv) = oneshot::channel(); +                logic +                    .clone() +                    .handle_online( +                        Command::SendPresence(None, PresenceType::Online(online.clone()), send), +                        connection, +                    ) +                    .await; +                let set_status = recv.await; +                match set_status { +                    Ok(s) => match s { +                        Ok(()) => { +                            let _ = logic +                                .update_sender() +                                .send(UpdateMessage::Online(online, roster)) +                                .await; +                        } +                        Err(e) => { +                            let _ = logic +                                .update_sender() +                                .send(UpdateMessage::Error(Error::Connecting(e.into()))) +                                .await; +                        } +                    }, +                    Err(e) => { +                        let _ = logic +                            .update_sender() +                            .send(UpdateMessage::Error(Error::Connecting( +                                ConnectionJobError::SendPresence(WriteError::Actor(e.into())), +                            ))) +                            .await; +                    } +                } +            } +            Err(e) => { +                let _ = logic +                    .update_sender() +                    .send(UpdateMessage::Error(Error::Connecting(e.into()))) +                    .await; +            } +        }, +        Err(e) => { +            let _ = logic +                .update_sender() +                .send(UpdateMessage::Error(Error::Connecting( +                    ConnectionJobError::RosterRetreival(RosterError::Write(WriteError::Actor( +                        e.into(), +                    ))), +                ))) +                .await; +        } +    } +} diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs new file mode 100644 index 0000000..ac9e931 --- /dev/null +++ b/filamento/src/logic/connection_error.rs @@ -0,0 +1,12 @@ +use lampada::error::ConnectionError; + +use crate::UpdateMessage; + +use super::ClientLogic; + +pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) { +    logic +        .update_sender() +        .send(UpdateMessage::Error(error.into())) +        .await; +} diff --git a/filamento/src/logic/disconnect.rs b/filamento/src/logic/disconnect.rs new file mode 100644 index 0000000..241c3e6 --- /dev/null +++ b/filamento/src/logic/disconnect.rs @@ -0,0 +1,18 @@ +use lampada::Connected; +use stanza::client::Stanza; + +use crate::{UpdateMessage, presence::Offline}; + +use super::ClientLogic; + +pub async fn handle_disconnect(logic: ClientLogic, connection: Connected) { +    // TODO: be able to set offline status message +    let offline_presence: stanza::client::presence::Presence = Offline::default().into_stanza(None); +    let stanza = Stanza::Presence(offline_presence); +    // TODO: timeout and error check +    connection.write_handle().write(stanza).await; +    let _ = logic +        .update_sender() +        .send(UpdateMessage::Offline(Offline::default())) +        .await; +} diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs new file mode 100644 index 0000000..638f682 --- /dev/null +++ b/filamento/src/logic/mod.rs @@ -0,0 +1,90 @@ +use std::{collections::HashMap, sync::Arc}; + +use lampada::{Logic, error::ReadError}; +use stanza::client::Stanza; +use tokio::sync::{Mutex, mpsc, oneshot}; + +use crate::{Command, UpdateMessage, db::Db}; + +mod abort; +mod connect; +mod connection_error; +mod disconnect; +mod offline; +mod online; +mod process_stanza; + +#[derive(Clone)] +pub struct ClientLogic { +    db: Db, +    pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, +    update_sender: mpsc::Sender<UpdateMessage>, +} + +impl ClientLogic { +    pub fn new( +        db: Db, +        pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>, +        update_sender: mpsc::Sender<UpdateMessage>, +    ) -> Self { +        Self { +            db, +            pending, +            update_sender, +        } +    } + +    pub fn db(&self) -> &Db { +        &self.db +    } + +    pub fn pending(&self) -> &Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>> { +        &self.pending.as_ref() +    } + +    pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> { +        &self.update_sender +    } +} + +impl Logic for ClientLogic { +    type Cmd = Command; + +    // pub async fn handle_stream_error(self, error) {} +    // stanza errors (recoverable) +    // pub async fn handle_error(self, error: Error) {} +    // when it aborts, must clear iq map no matter what + +    async fn handle_connect(self, connection: lampada::Connected) { +        connect::handle_connect(self, connection).await; +    } + +    async fn handle_disconnect(self, connection: lampada::Connected) { +        disconnect::handle_disconnect(self, connection).await; +    } + +    async fn handle_stanza( +        self, +        stanza: ::stanza::client::Stanza, +        connection: lampada::Connected, +        supervisor: lampada::SupervisorSender, +    ) { +        process_stanza::handle_stanza(self, stanza, connection, supervisor).await; +    } + +    async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) { +        online::handle_online(self, command, connection).await; +    } + +    async fn handle_offline(self, command: Self::Cmd) { +        offline::handle_offline(self, command).await; +    } + +    async fn on_abort(self) { +        abort::on_abort(self).await; +    } + +    async fn handle_connection_error(self, error: lampada::error::ConnectionError) { +        connection_error::handle_connection_error(self, error).await; +    } +} diff --git a/filamento/src/logic/offline.rs b/filamento/src/logic/offline.rs new file mode 100644 index 0000000..17a60f3 --- /dev/null +++ b/filamento/src/logic/offline.rs @@ -0,0 +1,110 @@ +use lampada::error::WriteError; + +use crate::{ +    Command, +    error::{RosterError, StatusError}, +}; + +use super::ClientLogic; + +pub async fn handle_offline(logic: ClientLogic, command: Command) { +    match command { +        Command::GetRoster(sender) => { +            let roster = logic.db().read_cached_roster().await; +            match roster { +                Ok(roster) => { +                    let _ = sender.send(Ok(roster)); +                } +                Err(e) => { +                    let _ = sender.send(Err(RosterError::Cache(e.into()))); +                } +            } +        } +        Command::GetChats(sender) => { +            let chats = logic.db().read_chats().await.map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetChatsOrdered(sender) => { +            let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetChatsOrderedWithLatestMessages(sender) => { +            let chats = logic +                .db() +                .read_chats_ordered_with_latest_messages() +                .await +                .map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetChat(jid, sender) => { +            let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetMessages(jid, sender) => { +            let messages = logic +                .db() +                .read_message_history(jid) +                .await +                .map_err(|e| e.into()); +            sender.send(messages); +        } +        Command::DeleteChat(jid, sender) => { +            let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); +            sender.send(result); +        } +        Command::DeleteMessage(uuid, sender) => { +            let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); +            sender.send(result); +        } +        Command::GetUser(jid, sender) => { +            let user = logic.db().read_user(jid).await.map_err(|e| e.into()); +            sender.send(user); +        } +        // TODO: offline queue to modify roster +        Command::AddContact(_jid, sender) => { +            sender.send(Err(RosterError::Write(WriteError::Disconnected))); +        } +        Command::BuddyRequest(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::SubscriptionRequest(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::AcceptBuddyRequest(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::AcceptSubscriptionRequest(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::UnsubscribeFromContact(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::UnsubscribeContact(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::UnfriendContact(_jid, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::DeleteContact(_jid, sender) => { +            sender.send(Err(RosterError::Write(WriteError::Disconnected))); +        } +        Command::UpdateContact(_jid, _contact_update, sender) => { +            sender.send(Err(RosterError::Write(WriteError::Disconnected))); +        } +        Command::SetStatus(online, sender) => { +            let result = logic +                .db() +                .upsert_cached_status(online) +                .await +                .map_err(|e| StatusError::Cache(e.into())); +            sender.send(result); +        } +        // TODO: offline message queue +        Command::SendMessage(_jid, _body, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +        Command::SendPresence(_jid, _presence, sender) => { +            sender.send(Err(WriteError::Disconnected)); +        } +    } +} diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs new file mode 100644 index 0000000..e8cbb33 --- /dev/null +++ b/filamento/src/logic/online.rs @@ -0,0 +1,661 @@ +use chrono::Utc; +use lampada::{Connected, WriteMessage, error::WriteError}; +use stanza::client::{ +    Stanza, +    iq::{self, Iq, IqType}, +}; +use tokio::sync::oneshot; +use tracing::{debug, info}; +use uuid::Uuid; + +use crate::{ +    Command, UpdateMessage, +    chat::Message, +    error::{Error, MessageSendError, RosterError, StatusError}, +    roster::Contact, +}; + +use super::ClientLogic; + +pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { +    match command { +        Command::GetRoster(result_sender) => { +            let iq_id = Uuid::new_v4().to_string(); +            let (send, iq_recv) = oneshot::channel(); +            { +                logic.pending().lock().await.insert(iq_id.clone(), send); +            } +            let stanza = Stanza::Iq(Iq { +                from: Some(connection.jid().clone()), +                id: iq_id.to_string(), +                to: None, +                r#type: IqType::Get, +                lang: None, +                query: Some(iq::Query::Roster(stanza::roster::Query { +                    ver: None, +                    items: Vec::new(), +                })), +                errors: Vec::new(), +            }); +            let (send, recv) = oneshot::channel(); +            let _ = connection +                .write_handle() +                .send(WriteMessage { +                    stanza, +                    respond_to: send, +                }) +                .await; +            // TODO: timeout +            match recv.await { +                Ok(Ok(())) => info!("roster request sent"), +                Ok(Err(e)) => { +                    // TODO: log errors if fail to send +                    let _ = result_sender.send(Err(RosterError::Write(e.into()))); +                    return; +                } +                Err(e) => { +                    let _ = +                        result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                    return; +                } +            }; +            // TODO: timeout +            match iq_recv.await { +                Ok(Ok(stanza)) => match stanza { +                    Stanza::Iq(Iq { +                        from: _, +                        id, +                        to: _, +                        r#type, +                        lang: _, +                        query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), +                        errors: _, +                    }) if id == iq_id && r#type == IqType::Result => { +                        let contacts: Vec<Contact> = +                            items.into_iter().map(|item| item.into()).collect(); +                        if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { +                            logic +                                .update_sender() +                                .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( +                                    e.into(), +                                )))) +                                .await; +                        }; +                        result_sender.send(Ok(contacts)); +                        return; +                    } +                    ref s @ Stanza::Iq(Iq { +                        from: _, +                        ref id, +                        to: _, +                        r#type, +                        lang: _, +                        query: _, +                        ref errors, +                    }) if *id == iq_id && r#type == IqType::Error => { +                        if let Some(error) = errors.first() { +                            result_sender.send(Err(RosterError::StanzaError(error.clone()))); +                        } else { +                            result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                        } +                        return; +                    } +                    s => { +                        result_sender.send(Err(RosterError::UnexpectedStanza(s))); +                        return; +                    } +                }, +                Ok(Err(e)) => { +                    result_sender.send(Err(RosterError::Read(e))); +                    return; +                } +                Err(e) => { +                    result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                    return; +                } +            } +        } +        Command::GetChats(sender) => { +            let chats = logic.db().read_chats().await.map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetChatsOrdered(sender) => { +            let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetChatsOrderedWithLatestMessages(sender) => { +            let chats = logic +                .db() +                .read_chats_ordered_with_latest_messages() +                .await +                .map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetChat(jid, sender) => { +            let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); +            sender.send(chats); +        } +        Command::GetMessages(jid, sender) => { +            let messages = logic +                .db() +                .read_message_history(jid) +                .await +                .map_err(|e| e.into()); +            sender.send(messages); +        } +        Command::DeleteChat(jid, sender) => { +            let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); +            sender.send(result); +        } +        Command::DeleteMessage(uuid, sender) => { +            let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); +            sender.send(result); +        } +        Command::GetUser(jid, sender) => { +            let user = logic.db().read_user(jid).await.map_err(|e| e.into()); +            sender.send(user); +        } +        // TODO: offline queue to modify roster +        Command::AddContact(jid, sender) => { +            let iq_id = Uuid::new_v4().to_string(); +            let set_stanza = Stanza::Iq(Iq { +                from: Some(connection.jid().clone()), +                id: iq_id.clone(), +                to: None, +                r#type: IqType::Set, +                lang: None, +                query: Some(iq::Query::Roster(stanza::roster::Query { +                    ver: None, +                    items: vec![stanza::roster::Item { +                        approved: None, +                        ask: false, +                        jid, +                        name: None, +                        subscription: None, +                        groups: Vec::new(), +                    }], +                })), +                errors: Vec::new(), +            }); +            let (send, recv) = oneshot::channel(); +            { +                logic.pending().lock().await.insert(iq_id.clone(), send); +            } +            // TODO: write_handle send helper function +            let result = connection.write_handle().write(set_stanza).await; +            if let Err(e) = result { +                sender.send(Err(RosterError::Write(e))); +                return; +            } +            let iq_result = recv.await; +            match iq_result { +                Ok(i) => match i { +                    Ok(iq_result) => match iq_result { +                        Stanza::Iq(Iq { +                            from: _, +                            id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            errors: _, +                        }) if id == iq_id && r#type == IqType::Result => { +                            sender.send(Ok(())); +                            return; +                        } +                        ref s @ Stanza::Iq(Iq { +                            from: _, +                            ref id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            ref errors, +                        }) if *id == iq_id && r#type == IqType::Error => { +                            if let Some(error) = errors.first() { +                                sender.send(Err(RosterError::StanzaError(error.clone()))); +                            } else { +                                sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                            } +                            return; +                        } +                        s => { +                            sender.send(Err(RosterError::UnexpectedStanza(s))); +                            return; +                        } +                    }, +                    Err(e) => { +                        sender.send(Err(e.into())); +                        return; +                    } +                }, +                Err(e) => { +                    sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                    return; +                } +            } +        } +        Command::BuddyRequest(jid, sender) => { +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid.clone()), +                r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            match result { +                Err(_) => { +                    let _ = sender.send(result); +                } +                Ok(()) => { +                    let presence = Stanza::Presence(stanza::client::presence::Presence { +                        from: None, +                        id: None, +                        to: Some(jid), +                        r#type: Some(stanza::client::presence::PresenceType::Subscribed), +                        lang: None, +                        show: None, +                        status: None, +                        priority: None, +                        errors: Vec::new(), +                        delay: None, +                    }); +                    let result = connection.write_handle().write(presence).await; +                    let _ = sender.send(result); +                } +            } +        } +        Command::SubscriptionRequest(jid, sender) => { +            // TODO: i should probably have builders +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid), +                r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            let _ = sender.send(result); +        } +        Command::AcceptBuddyRequest(jid, sender) => { +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid.clone()), +                r#type: Some(stanza::client::presence::PresenceType::Subscribed), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            match result { +                Err(_) => { +                    let _ = sender.send(result); +                } +                Ok(()) => { +                    let presence = Stanza::Presence(stanza::client::presence::Presence { +                        from: None, +                        id: None, +                        to: Some(jid), +                        r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                        lang: None, +                        show: None, +                        status: None, +                        priority: None, +                        errors: Vec::new(), +                        delay: None, +                    }); +                    let result = connection.write_handle().write(presence).await; +                    let _ = sender.send(result); +                } +            } +        } +        Command::AcceptSubscriptionRequest(jid, sender) => { +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid), +                r#type: Some(stanza::client::presence::PresenceType::Subscribe), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            let _ = sender.send(result); +        } +        Command::UnsubscribeFromContact(jid, sender) => { +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid), +                r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            let _ = sender.send(result); +        } +        Command::UnsubscribeContact(jid, sender) => { +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid), +                r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            let _ = sender.send(result); +        } +        Command::UnfriendContact(jid, sender) => { +            let presence = Stanza::Presence(stanza::client::presence::Presence { +                from: None, +                id: None, +                to: Some(jid.clone()), +                r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), +                lang: None, +                show: None, +                status: None, +                priority: None, +                errors: Vec::new(), +                delay: None, +            }); +            let result = connection.write_handle().write(presence).await; +            match result { +                Err(_) => { +                    let _ = sender.send(result); +                } +                Ok(()) => { +                    let presence = Stanza::Presence(stanza::client::presence::Presence { +                        from: None, +                        id: None, +                        to: Some(jid), +                        r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), +                        lang: None, +                        show: None, +                        status: None, +                        priority: None, +                        errors: Vec::new(), +                        delay: None, +                    }); +                    let result = connection.write_handle().write(presence).await; +                    let _ = sender.send(result); +                } +            } +        } +        Command::DeleteContact(jid, sender) => { +            let iq_id = Uuid::new_v4().to_string(); +            let set_stanza = Stanza::Iq(Iq { +                from: Some(connection.jid().clone()), +                id: iq_id.clone(), +                to: None, +                r#type: IqType::Set, +                lang: None, +                query: Some(iq::Query::Roster(stanza::roster::Query { +                    ver: None, +                    items: vec![stanza::roster::Item { +                        approved: None, +                        ask: false, +                        jid, +                        name: None, +                        subscription: Some(stanza::roster::Subscription::Remove), +                        groups: Vec::new(), +                    }], +                })), +                errors: Vec::new(), +            }); +            let (send, recv) = oneshot::channel(); +            { +                logic.pending().lock().await.insert(iq_id.clone(), send); +            } +            let result = connection.write_handle().write(set_stanza).await; +            if let Err(e) = result { +                sender.send(Err(RosterError::Write(e))); +                return; +            } +            let iq_result = recv.await; +            match iq_result { +                Ok(i) => match i { +                    Ok(iq_result) => match iq_result { +                        Stanza::Iq(Iq { +                            from: _, +                            id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            errors: _, +                        }) if id == iq_id && r#type == IqType::Result => { +                            sender.send(Ok(())); +                            return; +                        } +                        ref s @ Stanza::Iq(Iq { +                            from: _, +                            ref id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            ref errors, +                        }) if *id == iq_id && r#type == IqType::Error => { +                            if let Some(error) = errors.first() { +                                sender.send(Err(RosterError::StanzaError(error.clone()))); +                            } else { +                                sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                            } +                            return; +                        } +                        s => { +                            sender.send(Err(RosterError::UnexpectedStanza(s))); +                            return; +                        } +                    }, +                    Err(e) => { +                        sender.send(Err(e.into())); +                        return; +                    } +                }, +                Err(e) => { +                    sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                    return; +                } +            } +        } +        Command::UpdateContact(jid, contact_update, sender) => { +            let iq_id = Uuid::new_v4().to_string(); +            let groups = Vec::from_iter( +                contact_update +                    .groups +                    .into_iter() +                    .map(|group| stanza::roster::Group(Some(group))), +            ); +            let set_stanza = Stanza::Iq(Iq { +                from: Some(connection.jid().clone()), +                id: iq_id.clone(), +                to: None, +                r#type: IqType::Set, +                lang: None, +                query: Some(iq::Query::Roster(stanza::roster::Query { +                    ver: None, +                    items: vec![stanza::roster::Item { +                        approved: None, +                        ask: false, +                        jid, +                        name: contact_update.name, +                        subscription: None, +                        groups, +                    }], +                })), +                errors: Vec::new(), +            }); +            let (send, recv) = oneshot::channel(); +            { +                logic.pending().lock().await.insert(iq_id.clone(), send); +            } +            let result = connection.write_handle().write(set_stanza).await; +            if let Err(e) = result { +                sender.send(Err(RosterError::Write(e))); +                return; +            } +            let iq_result = recv.await; +            match iq_result { +                Ok(i) => match i { +                    Ok(iq_result) => match iq_result { +                        Stanza::Iq(Iq { +                            from: _, +                            id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            errors: _, +                        }) if id == iq_id && r#type == IqType::Result => { +                            sender.send(Ok(())); +                            return; +                        } +                        ref s @ Stanza::Iq(Iq { +                            from: _, +                            ref id, +                            to: _, +                            r#type, +                            lang: _, +                            query: _, +                            ref errors, +                        }) if *id == iq_id && r#type == IqType::Error => { +                            if let Some(error) = errors.first() { +                                sender.send(Err(RosterError::StanzaError(error.clone()))); +                            } else { +                                sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); +                            } +                            return; +                        } +                        s => { +                            sender.send(Err(RosterError::UnexpectedStanza(s))); +                            return; +                        } +                    }, +                    Err(e) => { +                        sender.send(Err(e.into())); +                        return; +                    } +                }, +                Err(e) => { +                    sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); +                    return; +                } +            } +        } +        Command::SetStatus(online, sender) => { +            let result = logic.db().upsert_cached_status(online.clone()).await; +            if let Err(e) = result { +                let _ = logic +                    .update_sender() +                    .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( +                        e.into(), +                    )))) +                    .await; +            } +            let result = connection +                .write_handle() +                .write(Stanza::Presence(online.into_stanza(None))) +                .await +                .map_err(|e| StatusError::Write(e)); +            // .map_err(|e| StatusError::Write(e)); +            let _ = sender.send(result); +        } +        // TODO: offline message queue +        Command::SendMessage(jid, body, sender) => { +            let id = Uuid::new_v4(); +            let message = Stanza::Message(stanza::client::message::Message { +                from: Some(connection.jid().clone()), +                id: Some(id.to_string()), +                to: Some(jid.clone()), +                // TODO: specify message type +                r#type: stanza::client::message::MessageType::Chat, +                // TODO: lang ? +                lang: None, +                subject: None, +                body: Some(stanza::client::message::Body { +                    lang: None, +                    body: Some(body.body.clone()), +                }), +                thread: None, +                delay: None, +            }); +            let _ = sender.send(Ok(())); +            // let _ = sender.send(Ok(message.clone())); +            let result = connection.write_handle().write(message).await; +            match result { +                Ok(_) => { +                    let mut message = Message { +                        id, +                        from: connection.jid().clone(), +                        body, +                        timestamp: Utc::now(), +                    }; +                    info!("send message {:?}", message); +                    if let Err(e) = logic +                        .db() +                        .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) +                        .await +                        .map_err(|e| e.into()) +                    { +                        tracing::error!("{}", e); +                        let _ = +                            logic +                                .update_sender() +                                .send(UpdateMessage::Error(Error::MessageSend( +                                    MessageSendError::MessageHistory(e), +                                ))); +                    } +                    // TODO: don't do this, have separate from from details +                    message.from = message.from.as_bare(); +                    let _ = logic +                        .update_sender() +                        .send(UpdateMessage::Message { to: jid, message }) +                        .await; +                } +                Err(_) => { +                    // let _ = sender.send(result); +                } +            } +        } +        Command::SendPresence(jid, presence, sender) => { +            let mut presence: stanza::client::presence::Presence = presence.into(); +            if let Some(jid) = jid { +                presence.to = Some(jid); +            }; +            let result = connection +                .write_handle() +                .write(Stanza::Presence(presence)) +                .await; +            // .map_err(|e| StatusError::Write(e)); +            let _ = sender.send(result); +        } +    } +} diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs new file mode 100644 index 0000000..17738df --- /dev/null +++ b/filamento/src/logic/process_stanza.rs @@ -0,0 +1,264 @@ +use std::str::FromStr; + +use chrono::Utc; +use lampada::{Connected, SupervisorSender}; +use stanza::client::Stanza; +use uuid::Uuid; + +use crate::{ +    UpdateMessage, +    chat::{Body, Message}, +    error::{Error, IqError, MessageRecvError, PresenceError, RosterError}, +    presence::{Offline, Online, Presence, PresenceType, Show}, +    roster::Contact, +}; + +use super::ClientLogic; + +pub async fn handle_stanza( +    logic: ClientLogic, +    stanza: Stanza, +    connection: Connected, +    supervisor: SupervisorSender, +) { +    match stanza { +        Stanza::Message(stanza_message) => { +            if let Some(mut from) = stanza_message.from { +                // TODO: don't ignore delay from. xep says SHOULD send error if incorrect. +                let timestamp = stanza_message +                    .delay +                    .map(|delay| delay.stamp) +                    .unwrap_or_else(|| Utc::now()); +                // TODO: group chat messages +                let mut message = Message { +                    id: stanza_message +                        .id +                        // TODO: proper id storage +                        .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4())) +                        .unwrap_or_else(|| Uuid::new_v4()), +                    from: from.clone(), +                    timestamp, +                    body: Body { +                        // TODO: should this be an option? +                        body: stanza_message +                            .body +                            .map(|body| body.body) +                            .unwrap_or_default() +                            .unwrap_or_default(), +                    }, +                }; +                // TODO: can this be more efficient? +                let result = logic +                    .db() +                    .create_message_with_user_resource_and_chat(message.clone(), from.clone()) +                    .await; +                if let Err(e) = result { +                    tracing::error!("messagecreate"); +                    let _ = logic +                        .update_sender() +                        .send(UpdateMessage::Error(Error::MessageRecv( +                            MessageRecvError::MessageHistory(e.into()), +                        ))) +                        .await; +                } +                message.from = message.from.as_bare(); +                from = from.as_bare(); +                let _ = logic +                    .update_sender() +                    .send(UpdateMessage::Message { to: from, message }) +                    .await; +            } else { +                let _ = logic +                    .update_sender() +                    .send(UpdateMessage::Error(Error::MessageRecv( +                        MessageRecvError::MissingFrom, +                    ))) +                    .await; +            } +        } +        Stanza::Presence(presence) => { +            if let Some(from) = presence.from { +                match presence.r#type { +                    Some(r#type) => match r#type { +                        // error processing a presence from somebody +                        stanza::client::presence::PresenceType::Error => { +                            // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option. +                            let _ = logic +                                .update_sender() +                                .send(UpdateMessage::Error(Error::Presence( +                                    // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display +                                    PresenceError::StanzaError( +                                        presence +                                            .errors +                                            .first() +                                            .cloned() +                                            .expect("error MUST have error"), +                                    ), +                                ))) +                                .await; +                        } +                        // should not happen (error to server) +                        stanza::client::presence::PresenceType::Probe => { +                            // TODO: should probably write an error and restart stream +                            let _ = logic +                                .update_sender() +                                .send(UpdateMessage::Error(Error::Presence( +                                    PresenceError::Unsupported, +                                ))) +                                .await; +                        } +                        stanza::client::presence::PresenceType::Subscribe => { +                            // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event +                            let _ = logic +                                .update_sender() +                                .send(UpdateMessage::SubscriptionRequest(from)) +                                .await; +                        } +                        stanza::client::presence::PresenceType::Unavailable => { +                            let offline = Offline { +                                status: presence.status.map(|status| status.status.0), +                            }; +                            let timestamp = presence +                                .delay +                                .map(|delay| delay.stamp) +                                .unwrap_or_else(|| Utc::now()); +                            let _ = logic +                                .update_sender() +                                .send(UpdateMessage::Presence { +                                    from, +                                    presence: Presence { +                                        timestamp, +                                        presence: PresenceType::Offline(offline), +                                    }, +                                }) +                                .await; +                        } +                        // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them. +                        stanza::client::presence::PresenceType::Subscribed => {} +                        stanza::client::presence::PresenceType::Unsubscribe => {} +                        stanza::client::presence::PresenceType::Unsubscribed => {} +                    }, +                    None => { +                        let online = Online { +                            show: presence.show.map(|show| match show { +                                stanza::client::presence::Show::Away => Show::Away, +                                stanza::client::presence::Show::Chat => Show::Chat, +                                stanza::client::presence::Show::Dnd => Show::DoNotDisturb, +                                stanza::client::presence::Show::Xa => Show::ExtendedAway, +                            }), +                            status: presence.status.map(|status| status.status.0), +                            priority: presence.priority.map(|priority| priority.0), +                        }; +                        let timestamp = presence +                            .delay +                            .map(|delay| delay.stamp) +                            .unwrap_or_else(|| Utc::now()); +                        let _ = logic +                            .update_sender() +                            .send(UpdateMessage::Presence { +                                from, +                                presence: Presence { +                                    timestamp, +                                    presence: PresenceType::Online(online), +                                }, +                            }) +                            .await; +                    } +                } +            } else { +                let _ = logic +                    .update_sender() +                    .send(UpdateMessage::Error(Error::Presence( +                        PresenceError::MissingFrom, +                    ))) +                    .await; +            } +        } +        Stanza::Iq(iq) => match iq.r#type { +            stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => { +                let send; +                { +                    send = logic.pending().lock().await.remove(&iq.id); +                } +                if let Some(send) = send { +                    send.send(Ok(Stanza::Iq(iq))); +                } else { +                    let _ = logic +                        .update_sender() +                        .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId( +                            iq.id, +                        )))) +                        .await; +                } +            } +            // TODO: send unsupported to server +            // TODO: proper errors i am so tired please +            stanza::client::iq::IqType::Get => {} +            stanza::client::iq::IqType::Set => { +                if let Some(query) = iq.query { +                    match query { +                        stanza::client::iq::Query::Roster(mut query) => { +                            // TODO: there should only be one +                            if let Some(item) = query.items.pop() { +                                match item.subscription { +                                    Some(stanza::roster::Subscription::Remove) => { +                                        logic.db().delete_contact(item.jid.clone()).await; +                                        logic +                                            .update_sender() +                                            .send(UpdateMessage::RosterDelete(item.jid)) +                                            .await; +                                        // TODO: send result +                                    } +                                    _ => { +                                        let contact: Contact = item.into(); +                                        if let Err(e) = +                                            logic.db().upsert_contact(contact.clone()).await +                                        { +                                            let _ = logic +                                                .update_sender() +                                                .send(UpdateMessage::Error(Error::Roster( +                                                    RosterError::Cache(e.into()), +                                                ))) +                                                .await; +                                        } +                                        let _ = logic +                                            .update_sender() +                                            .send(UpdateMessage::RosterUpdate(contact)) +                                            .await; +                                        // TODO: send result +                                        // write_handle.write(Stanza::Iq(stanza::client::iq::Iq { +                                        //     from: , +                                        //     id: todo!(), +                                        //     to: todo!(), +                                        //     r#type: todo!(), +                                        //     lang: todo!(), +                                        //     query: todo!(), +                                        //     errors: todo!(), +                                        // })); +                                    } +                                } +                            } +                        } +                        // TODO: send unsupported to server +                        _ => {} +                    } +                } else { +                    // TODO: send error (unsupported) to server +                } +            } +        }, +        Stanza::Error(error) => { +            let _ = logic +                .update_sender() +                .send(UpdateMessage::Error(Error::Stream(error))) +                .await; +            // TODO: reconnect +        } +        Stanza::OtherContent(content) => { +            let _ = logic +                .update_sender() +                .send(UpdateMessage::Error(Error::UnrecognizedContent)); +            // TODO: send error to write_thread +        } +    } +} | 
