use chrono::Utc; use lampada::{Connected, WriteMessage, error::WriteError}; use stanza::client::{ Stanza, iq::{self, Iq, IqType}, }; use tokio::sync::oneshot; use tracing::{debug, info}; use uuid::Uuid; use crate::{ Command, UpdateMessage, chat::Message, error::{Error, MessageSendError, RosterError, StatusError}, roster::Contact, }; use super::ClientLogic; pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { match command { Command::GetRoster(result_sender) => { let iq_id = Uuid::new_v4().to_string(); let (send, iq_recv) = oneshot::channel(); { logic.pending().lock().await.insert(iq_id.clone(), send); } let stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.to_string(), to: None, r#type: IqType::Get, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: Vec::new(), })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); let _ = connection .write_handle() .send(WriteMessage { stanza, respond_to: send, }) .await; // TODO: timeout match recv.await { Ok(Ok(())) => info!("roster request sent"), Ok(Err(e)) => { // TODO: log errors if fail to send let _ = result_sender.send(Err(RosterError::Write(e.into()))); return; } Err(e) => { let _ = result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } }; // TODO: timeout match iq_recv.await { Ok(Ok(stanza)) => match stanza { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), errors: _, }) if id == iq_id && r#type == IqType::Result => { let contacts: Vec = items.into_iter().map(|item| item.into()).collect(); if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { logic .update_sender() .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( e.into(), )))) .await; }; result_sender.send(Ok(contacts)); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { result_sender.send(Err(RosterError::StanzaError(error.clone()))); } else { result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { result_sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Ok(Err(e)) => { result_sender.send(Err(RosterError::Read(e))); return; } Err(e) => { result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::GetChats(sender) => { let chats = logic.db().read_chats().await.map_err(|e| e.into()); sender.send(chats); } Command::GetChatsOrdered(sender) => { let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); sender.send(chats); } Command::GetChatsOrderedWithLatestMessages(sender) => { let chats = logic .db() .read_chats_ordered_with_latest_messages() .await .map_err(|e| e.into()); sender.send(chats); } Command::GetChat(jid, sender) => { let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); sender.send(chats); } Command::GetMessages(jid, sender) => { let messages = logic .db() .read_message_history(jid) .await .map_err(|e| e.into()); sender.send(messages); } Command::DeleteChat(jid, sender) => { let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); sender.send(result); } Command::DeleteMessage(uuid, sender) => { let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); sender.send(result); } Command::GetUser(jid, sender) => { let user = logic.db().read_user(jid).await.map_err(|e| e.into()); sender.send(user); } // TODO: offline queue to modify roster Command::AddContact(jid, sender) => { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: None, groups: Vec::new(), }], })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); { logic.pending().lock().await.insert(iq_id.clone(), send); } // TODO: write_handle send helper function let result = connection.write_handle().write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; match iq_result { Ok(i) => match i { Ok(iq_result) => match iq_result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => { sender.send(Ok(())); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { sender.send(Err(RosterError::StanzaError(error.clone()))); } else { sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Err(e) => { sender.send(Err(e.into())); return; } }, Err(e) => { sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::BuddyRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; match result { Err(_) => { let _ = sender.send(result); } Ok(()) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } } } Command::SubscriptionRequest(jid, sender) => { // TODO: i should probably have builders let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::AcceptBuddyRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Subscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; match result { Err(_) => { let _ = sender.send(result); } Ok(()) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } } } Command::AcceptSubscriptionRequest(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Subscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::UnsubscribeFromContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::UnsubscribeContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } Command::UnfriendContact(jid, sender) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid.clone()), r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; match result { Err(_) => { let _ = sender.send(result); } Ok(()) => { let presence = Stanza::Presence(stanza::client::presence::Presence { from: None, id: None, to: Some(jid), r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), lang: None, show: None, status: None, priority: None, errors: Vec::new(), delay: None, }); let result = connection.write_handle().write(presence).await; let _ = sender.send(result); } } } Command::DeleteContact(jid, sender) => { let iq_id = Uuid::new_v4().to_string(); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: None, subscription: Some(stanza::roster::Subscription::Remove), groups: Vec::new(), }], })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); { logic.pending().lock().await.insert(iq_id.clone(), send); } let result = connection.write_handle().write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; match iq_result { Ok(i) => match i { Ok(iq_result) => match iq_result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => { sender.send(Ok(())); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { sender.send(Err(RosterError::StanzaError(error.clone()))); } else { sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Err(e) => { sender.send(Err(e.into())); return; } }, Err(e) => { sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::UpdateContact(jid, contact_update, sender) => { let iq_id = Uuid::new_v4().to_string(); let groups = Vec::from_iter( contact_update .groups .into_iter() .map(|group| stanza::roster::Group(Some(group))), ); let set_stanza = Stanza::Iq(Iq { from: Some(connection.jid().clone()), id: iq_id.clone(), to: None, r#type: IqType::Set, lang: None, query: Some(iq::Query::Roster(stanza::roster::Query { ver: None, items: vec![stanza::roster::Item { approved: None, ask: false, jid, name: contact_update.name, subscription: None, groups, }], })), errors: Vec::new(), }); let (send, recv) = oneshot::channel(); { logic.pending().lock().await.insert(iq_id.clone(), send); } let result = connection.write_handle().write(set_stanza).await; if let Err(e) = result { sender.send(Err(RosterError::Write(e))); return; } let iq_result = recv.await; match iq_result { Ok(i) => match i { Ok(iq_result) => match iq_result { Stanza::Iq(Iq { from: _, id, to: _, r#type, lang: _, query: _, errors: _, }) if id == iq_id && r#type == IqType::Result => { sender.send(Ok(())); return; } ref s @ Stanza::Iq(Iq { from: _, ref id, to: _, r#type, lang: _, query: _, ref errors, }) if *id == iq_id && r#type == IqType::Error => { if let Some(error) = errors.first() { sender.send(Err(RosterError::StanzaError(error.clone()))); } else { sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); } return; } s => { sender.send(Err(RosterError::UnexpectedStanza(s))); return; } }, Err(e) => { sender.send(Err(e.into())); return; } }, Err(e) => { sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); return; } } } Command::SetStatus(online, sender) => { let result = logic.db().upsert_cached_status(online.clone()).await; if let Err(e) = result { let _ = logic .update_sender() .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( e.into(), )))) .await; } let result = connection .write_handle() .write(Stanza::Presence(online.into_stanza(None))) .await .map_err(|e| StatusError::Write(e)); // .map_err(|e| StatusError::Write(e)); let _ = sender.send(result); } // TODO: offline message queue Command::SendMessage(jid, body, sender) => { let id = Uuid::new_v4(); let message = Stanza::Message(stanza::client::message::Message { from: Some(connection.jid().clone()), id: Some(id.to_string()), to: Some(jid.clone()), // TODO: specify message type r#type: stanza::client::message::MessageType::Chat, // TODO: lang ? lang: None, subject: None, body: Some(stanza::client::message::Body { lang: None, body: Some(body.body.clone()), }), thread: None, delay: None, }); let _ = sender.send(Ok(())); // let _ = sender.send(Ok(message.clone())); let result = connection.write_handle().write(message).await; match result { Ok(_) => { let mut message = Message { id, from: connection.jid().clone(), body, timestamp: Utc::now(), }; info!("send message {:?}", message); if let Err(e) = logic .db() .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) .await .map_err(|e| e.into()) { tracing::error!("{}", e); let _ = logic .update_sender() .send(UpdateMessage::Error(Error::MessageSend( MessageSendError::MessageHistory(e), ))); } // TODO: don't do this, have separate from from details message.from = message.from.as_bare(); let _ = logic .update_sender() .send(UpdateMessage::Message { to: jid, message }) .await; } Err(_) => { // let _ = sender.send(result); } } } Command::SendPresence(jid, presence, sender) => { let mut presence: stanza::client::presence::Presence = presence.into(); if let Some(jid) = jid { presence.to = Some(jid); }; let result = connection .write_handle() .write(Stanza::Presence(presence)) .await; // .map_err(|e| StatusError::Write(e)); let _ = sender.send(result); } } }