diff options
Diffstat (limited to 'filamento/src/logic/online.rs')
-rw-r--r-- | filamento/src/logic/online.rs | 661 |
1 files changed, 661 insertions, 0 deletions
diff --git a/filamento/src/logic/online.rs b/filamento/src/logic/online.rs new file mode 100644 index 0000000..e8cbb33 --- /dev/null +++ b/filamento/src/logic/online.rs @@ -0,0 +1,661 @@ +use chrono::Utc; +use lampada::{Connected, WriteMessage, error::WriteError}; +use stanza::client::{ + Stanza, + iq::{self, Iq, IqType}, +}; +use tokio::sync::oneshot; +use tracing::{debug, info}; +use uuid::Uuid; + +use crate::{ + Command, UpdateMessage, + chat::Message, + error::{Error, MessageSendError, RosterError, StatusError}, + roster::Contact, +}; + +use super::ClientLogic; + +pub async fn handle_online(logic: ClientLogic, command: Command, connection: Connected) { + match command { + Command::GetRoster(result_sender) => { + let iq_id = Uuid::new_v4().to_string(); + let (send, iq_recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + let stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.to_string(), + to: None, + r#type: IqType::Get, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: Vec::new(), + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + let _ = connection + .write_handle() + .send(WriteMessage { + stanza, + respond_to: send, + }) + .await; + // TODO: timeout + match recv.await { + Ok(Ok(())) => info!("roster request sent"), + Ok(Err(e)) => { + // TODO: log errors if fail to send + let _ = result_sender.send(Err(RosterError::Write(e.into()))); + return; + } + Err(e) => { + let _ = + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + }; + // TODO: timeout + match iq_recv.await { + Ok(Ok(stanza)) => match stanza { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: Some(iq::Query::Roster(stanza::roster::Query { ver: _, items })), + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + let contacts: Vec<Contact> = + items.into_iter().map(|item| item.into()).collect(); + if let Err(e) = logic.db().replace_cached_roster(contacts.clone()).await { + logic + .update_sender() + .send(UpdateMessage::Error(Error::Roster(RosterError::Cache( + e.into(), + )))) + .await; + }; + result_sender.send(Ok(contacts)); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + result_sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + result_sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + result_sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Ok(Err(e)) => { + result_sender.send(Err(RosterError::Read(e))); + return; + } + Err(e) => { + result_sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::GetChats(sender) => { + let chats = logic.db().read_chats().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrdered(sender) => { + let chats = logic.db().read_chats_ordered().await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChatsOrderedWithLatestMessages(sender) => { + let chats = logic + .db() + .read_chats_ordered_with_latest_messages() + .await + .map_err(|e| e.into()); + sender.send(chats); + } + Command::GetChat(jid, sender) => { + let chats = logic.db().read_chat(jid).await.map_err(|e| e.into()); + sender.send(chats); + } + Command::GetMessages(jid, sender) => { + let messages = logic + .db() + .read_message_history(jid) + .await + .map_err(|e| e.into()); + sender.send(messages); + } + Command::DeleteChat(jid, sender) => { + let result = logic.db().delete_chat(jid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::DeleteMessage(uuid, sender) => { + let result = logic.db().delete_message(uuid).await.map_err(|e| e.into()); + sender.send(result); + } + Command::GetUser(jid, sender) => { + let user = logic.db().read_user(jid).await.map_err(|e| e.into()); + sender.send(user); + } + // TODO: offline queue to modify roster + Command::AddContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: None, + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + // TODO: write_handle send helper function + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::BuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::SubscriptionRequest(jid, sender) => { + // TODO: i should probably have builders + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::AcceptBuddyRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Subscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::AcceptSubscriptionRequest(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Subscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeFromContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnsubscribeContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + Command::UnfriendContact(jid, sender) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid.clone()), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribe), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + match result { + Err(_) => { + let _ = sender.send(result); + } + Ok(()) => { + let presence = Stanza::Presence(stanza::client::presence::Presence { + from: None, + id: None, + to: Some(jid), + r#type: Some(stanza::client::presence::PresenceType::Unsubscribed), + lang: None, + show: None, + status: None, + priority: None, + errors: Vec::new(), + delay: None, + }); + let result = connection.write_handle().write(presence).await; + let _ = sender.send(result); + } + } + } + Command::DeleteContact(jid, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: None, + subscription: Some(stanza::roster::Subscription::Remove), + groups: Vec::new(), + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::UpdateContact(jid, contact_update, sender) => { + let iq_id = Uuid::new_v4().to_string(); + let groups = Vec::from_iter( + contact_update + .groups + .into_iter() + .map(|group| stanza::roster::Group(Some(group))), + ); + let set_stanza = Stanza::Iq(Iq { + from: Some(connection.jid().clone()), + id: iq_id.clone(), + to: None, + r#type: IqType::Set, + lang: None, + query: Some(iq::Query::Roster(stanza::roster::Query { + ver: None, + items: vec![stanza::roster::Item { + approved: None, + ask: false, + jid, + name: contact_update.name, + subscription: None, + groups, + }], + })), + errors: Vec::new(), + }); + let (send, recv) = oneshot::channel(); + { + logic.pending().lock().await.insert(iq_id.clone(), send); + } + let result = connection.write_handle().write(set_stanza).await; + if let Err(e) = result { + sender.send(Err(RosterError::Write(e))); + return; + } + let iq_result = recv.await; + match iq_result { + Ok(i) => match i { + Ok(iq_result) => match iq_result { + Stanza::Iq(Iq { + from: _, + id, + to: _, + r#type, + lang: _, + query: _, + errors: _, + }) if id == iq_id && r#type == IqType::Result => { + sender.send(Ok(())); + return; + } + ref s @ Stanza::Iq(Iq { + from: _, + ref id, + to: _, + r#type, + lang: _, + query: _, + ref errors, + }) if *id == iq_id && r#type == IqType::Error => { + if let Some(error) = errors.first() { + sender.send(Err(RosterError::StanzaError(error.clone()))); + } else { + sender.send(Err(RosterError::UnexpectedStanza(s.clone()))); + } + return; + } + s => { + sender.send(Err(RosterError::UnexpectedStanza(s))); + return; + } + }, + Err(e) => { + sender.send(Err(e.into())); + return; + } + }, + Err(e) => { + sender.send(Err(RosterError::Write(WriteError::Actor(e.into())))); + return; + } + } + } + Command::SetStatus(online, sender) => { + let result = logic.db().upsert_cached_status(online.clone()).await; + if let Err(e) = result { + let _ = logic + .update_sender() + .send(UpdateMessage::Error(Error::SetStatus(StatusError::Cache( + e.into(), + )))) + .await; + } + let result = connection + .write_handle() + .write(Stanza::Presence(online.into_stanza(None))) + .await + .map_err(|e| StatusError::Write(e)); + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + // TODO: offline message queue + Command::SendMessage(jid, body, sender) => { + let id = Uuid::new_v4(); + let message = Stanza::Message(stanza::client::message::Message { + from: Some(connection.jid().clone()), + id: Some(id.to_string()), + to: Some(jid.clone()), + // TODO: specify message type + r#type: stanza::client::message::MessageType::Chat, + // TODO: lang ? + lang: None, + subject: None, + body: Some(stanza::client::message::Body { + lang: None, + body: Some(body.body.clone()), + }), + thread: None, + delay: None, + }); + let _ = sender.send(Ok(())); + // let _ = sender.send(Ok(message.clone())); + let result = connection.write_handle().write(message).await; + match result { + Ok(_) => { + let mut message = Message { + id, + from: connection.jid().clone(), + body, + timestamp: Utc::now(), + }; + info!("send message {:?}", message); + if let Err(e) = logic + .db() + .create_message_with_self_resource_and_chat(message.clone(), jid.clone()) + .await + .map_err(|e| e.into()) + { + tracing::error!("{}", e); + let _ = + logic + .update_sender() + .send(UpdateMessage::Error(Error::MessageSend( + MessageSendError::MessageHistory(e), + ))); + } + // TODO: don't do this, have separate from from details + message.from = message.from.as_bare(); + let _ = logic + .update_sender() + .send(UpdateMessage::Message { to: jid, message }) + .await; + } + Err(_) => { + // let _ = sender.send(result); + } + } + } + Command::SendPresence(jid, presence, sender) => { + let mut presence: stanza::client::presence::Presence = presence.into(); + if let Some(jid) = jid { + presence.to = Some(jid); + }; + let result = connection + .write_handle() + .write(Stanza::Presence(presence)) + .await; + // .map_err(|e| StatusError::Write(e)); + let _ = sender.send(result); + } + } +} |