aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-02-20 21:08:16 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-02-20 21:08:16 +0000
commit2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98 (patch)
treed6191b02372f66c918954acaa570736e544c0193
parentc0d2aae0385aee7f1bb2ecb330e72e40b8fde6a2 (diff)
downloadluz-2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98.tar.gz
luz-2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98.tar.bz2
luz-2e6ad369c51fa7e60df8c7deaa59ec7705c3ff98.zip
implement CLIENT
-rw-r--r--luz/migrations/20240113011930_luz.sql2
-rw-r--r--luz/src/chat.rs4
-rw-r--r--luz/src/connection/read.rs200
-rw-r--r--luz/src/db/mod.rs65
-rw-r--r--luz/src/error.rs27
-rw-r--r--luz/src/lib.rs30
-rw-r--r--luz/src/main.rs26
-rw-r--r--luz/src/presence.rs4
-rw-r--r--luz/src/roster.rs3
9 files changed, 339 insertions, 22 deletions
diff --git a/luz/migrations/20240113011930_luz.sql b/luz/migrations/20240113011930_luz.sql
index 082cc4b..028ae24 100644
--- a/luz/migrations/20240113011930_luz.sql
+++ b/luz/migrations/20240113011930_luz.sql
@@ -14,7 +14,7 @@ create table subscription(
state text primary key not null
);
-insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');
+insert into subscription ( state ) values ('none'), ('pending-out'), ('pending-in'), ('pending-in-pending-out'), ('only-out'), ('only-in'), ('out-pending-in'), ('in-pending-out'), ('buddy');
-- a roster contains users, with client-set nickname
CREATE TABLE roster(
diff --git a/luz/src/chat.rs b/luz/src/chat.rs
index ff76ce1..4fb8579 100644
--- a/luz/src/chat.rs
+++ b/luz/src/chat.rs
@@ -1,7 +1,7 @@
use jid::JID;
use uuid::Uuid;
-#[derive(Debug, sqlx::FromRow)]
+#[derive(Debug, sqlx::FromRow, Clone)]
pub struct Message {
pub id: Uuid,
// does not contain full user information
@@ -20,7 +20,7 @@ pub struct Message {
// Outside,
// }
-#[derive(Debug, sqlx::FromRow)]
+#[derive(Debug, sqlx::FromRow, Clone)]
pub struct Body {
// TODO: rich text, other contents, threads
pub body: String,
diff --git a/luz/src/connection/read.rs b/luz/src/connection/read.rs
index 8f8c4a0..46f1dc9 100644
--- a/luz/src/connection/read.rs
+++ b/luz/src/connection/read.rs
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
+ str::FromStr,
sync::Arc,
time::Duration,
};
@@ -12,10 +13,14 @@ use tokio::{
task::{JoinHandle, JoinSet},
};
use tracing::info;
+use uuid::Uuid;
use crate::{
+ chat::{Body, Message},
db::Db,
- error::{Error, Reason},
+ error::{Error, IqError, PresenceError, Reason, RecvMessageError},
+ presence::{Offline, Online, Presence, Show},
+ roster::Contact,
UpdateMessage,
};
@@ -116,7 +121,7 @@ impl Read {
println!("read stanza");
match s {
Ok(s) => {
- self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone()));
+ self.tasks.spawn(handle_stanza(s, self.update_sender.clone(), self.db.clone(), self.supervisor_control.clone(), self.write_handle.clone(), self.pending_iqs.clone()));
},
Err(e) => {
println!("error: {:?}", e);
@@ -173,8 +178,197 @@ async fn handle_stanza(
db: Db,
supervisor_control: mpsc::Sender<SupervisorCommand>,
write_handle: WriteHandle,
+ pending_iqs: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, Reason>>>>>,
) {
- println!("{:?}", stanza)
+ match stanza {
+ Stanza::Message(stanza_message) => {
+ if let Some(from) = stanza_message.from {
+ // TODO: group chat messages
+ let message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ let result = db
+ .create_message_with_user_and_chat(message.clone(), from.clone())
+ .await;
+ if let Err(e) = result {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::CacheUpdate(e.into())))
+ .await;
+ }
+ let _ = update_sender
+ .send(UpdateMessage::Message { to: from, message })
+ .await;
+ } else {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::RecvMessage(
+ RecvMessageError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Presence(presence) => {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Presence(PresenceError::Error(
+ Reason::Stanza(presence.errors.first().cloned()),
+ ))))
+ .await;
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::Unsupported,
+ )))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ let _ = update_sender
+ .send(UpdateMessage::SubscriptionRequest(from))
+ .await;
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let _ = update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence::Offline(offline),
+ })
+ .await;
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => {}
+ stanza::client::presence::PresenceType::Unsubscribe => {}
+ stanza::client::presence::PresenceType::Unsubscribed => {}
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let _ = update_sender
+ .send(UpdateMessage::Presence {
+ from,
+ presence: Presence::Online(online),
+ })
+ .await;
+ }
+ }
+ } else {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Presence(
+ PresenceError::MissingFrom,
+ )))
+ .await;
+ }
+ }
+ Stanza::Iq(iq) => match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = pending_iqs.lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ } else {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
+ iq.id,
+ ))))
+ .await;
+ }
+ }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => {}
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ db.delete_contact(item.jid.clone()).await;
+ update_sender
+ .send(UpdateMessage::RosterDelete(item.jid))
+ .await;
+ // TODO: send result
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) = db.upsert_contact(contact.clone()).await {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::CacheUpdate(
+ e.into(),
+ )))
+ .await;
+ }
+ let _ = update_sender
+ .send(UpdateMessage::RosterUpdate(contact))
+ .await;
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
+ }
+ }
+ }
+ }
+ // TODO: send unsupported to server
+ _ => {}
+ }
+ } else {
+ // TODO: send error (unsupported) to server
+ }
+ }
+ },
+ Stanza::Error(error) => {
+ let _ = update_sender
+ .send(UpdateMessage::Error(Error::Stream(error)))
+ .await;
+ // TODO: reconnect
+ }
+ Stanza::OtherContent(content) => {
+ let _ = update_sender.send(UpdateMessage::Error(Error::UnrecognizedContent(content)));
+ // TODO: send error to write_thread
+ }
+ }
}
pub enum ReadControl {
diff --git a/luz/src/db/mod.rs b/luz/src/db/mod.rs
index 7557f70..4202163 100644
--- a/luz/src/db/mod.rs
+++ b/luz/src/db/mod.rs
@@ -160,6 +160,48 @@ impl Db {
Ok(())
}
+ pub async fn upsert_contact(&self, contact: Contact) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users ( jid ) values ( ? ) on conflict do nothing",
+ contact.user_jid,
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into roster ( user_jid, name, subscription ) values ( ?, ?, ? ) on conflict do update set name = ?, subscription = ?",
+ contact.user_jid,
+ contact.name,
+ contact.subscription,
+ contact.name,
+ contact.subscription
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "delete from groups_roster where contact_jid = ?",
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ // TODO: delete orphaned groups from groups table
+ for group in contact.groups {
+ sqlx::query!(
+ "insert into groups (group_name) values (?) on conflict do nothing",
+ group
+ )
+ .execute(&self.db)
+ .await?;
+ sqlx::query!(
+ "insert into groups_roster (group_name, contact_jid) values (?, ?)",
+ group,
+ contact.user_jid
+ )
+ .execute(&self.db)
+ .await?;
+ }
+ Ok(())
+ }
+
pub async fn delete_contact(&self, contact: JID) -> Result<(), Error> {
sqlx::query!("delete from roster where user_jid = ?", contact)
.execute(&self.db)
@@ -290,6 +332,29 @@ impl Db {
Ok(())
}
+ pub async fn create_message_with_user_and_chat(
+ &self,
+ message: Message,
+ chat: JID,
+ ) -> Result<(), Error> {
+ sqlx::query!(
+ "insert into users (jid) values (?) on conflict do nothing",
+ chat
+ )
+ .execute(&self.db)
+ .await?;
+ let id = Uuid::new_v4();
+ sqlx::query!(
+ "insert into chats (id, correspondent) values (?, ?) on conflict do nothing",
+ id,
+ chat
+ )
+ .execute(&self.db)
+ .await?;
+ self.create_message(message, chat).await?;
+ Ok(())
+ }
+
pub async fn read_message(&self, message: Uuid) -> Result<Message, Error> {
let message: Message = sqlx::query_as("select * from messages where id = ?")
.bind(message)
diff --git a/luz/src/error.rs b/luz/src/error.rs
index fbbcd2b..4fdce79 100644
--- a/luz/src/error.rs
+++ b/luz/src/error.rs
@@ -6,12 +6,35 @@ pub enum Error {
AlreadyConnected,
// TODO: change to Connecting(ConnectingError)
Connection(ConnectionError),
- Presence(Reason),
+ Presence(PresenceError),
+ SetStatus(Reason),
Roster(Reason),
+ Stream(stanza::stream::Error),
SendMessage(Reason),
+ RecvMessage(RecvMessageError),
AlreadyDisconnected,
LostConnection,
+ // TODO: should all cache update errors include the context?
CacheUpdate(Reason),
+ UnrecognizedContent(peanuts::element::Content),
+ Iq(IqError),
+}
+
+#[derive(Debug)]
+pub enum PresenceError {
+ Error(Reason),
+ Unsupported,
+ MissingFrom,
+}
+
+#[derive(Debug)]
+pub enum IqError {
+ NoMatchingId(String),
+}
+
+#[derive(Debug)]
+pub enum RecvMessageError {
+ MissingFrom,
}
#[derive(Debug)]
@@ -40,7 +63,7 @@ pub struct StatusError(pub Reason);
impl From<StatusError> for Error {
fn from(e: StatusError) -> Self {
- Error::Presence(e.0)
+ Error::SetStatus(e.0)
}
}
diff --git a/luz/src/lib.rs b/luz/src/lib.rs
index cffffb2..901553b 100644
--- a/luz/src/lib.rs
+++ b/luz/src/lib.rs
@@ -20,7 +20,7 @@ use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
};
-use tracing::{info, Instrument};
+use tracing::{debug, info, Instrument};
use user::User;
use uuid::Uuid;
@@ -28,7 +28,7 @@ use crate::connection::write::WriteHandle;
use crate::connection::{SupervisorCommand, SupervisorHandle};
use crate::error::Error;
-mod chat;
+pub mod chat;
mod connection;
mod db;
mod error;
@@ -103,12 +103,19 @@ impl Luz {
.await;
}
None => {
- let mut jid = self.jid.lock().await;
- let mut domain = jid.domainpart.clone();
- // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
- let streams_result =
- jabber::connect_and_login(&mut jid, &*self.password, &mut domain)
- .await;
+ let streams_result;
+ {
+ let mut jid = self.jid.lock().await;
+ let mut domain = jid.domainpart.clone();
+ // TODO: check what happens upon reconnection with same resource (this is probably what one wants to do and why jid should be mutated from a bare jid to one with a resource)
+ streams_result = jabber::connect_and_login(
+ &mut jid,
+ &*self.password,
+ &mut domain,
+ )
+ .await;
+ debug!("connected and logged in as {}", jid);
+ }
match streams_result {
Ok(s) => {
let (shutdown_send, shutdown_recv) = oneshot::channel::<()>();
@@ -124,6 +131,7 @@ impl Luz {
self.connection_supervisor_shutdown = shutdown_recv;
// TODO: get roster and send initial presence
let (send, recv) = oneshot::channel();
+ debug!("getting roster");
CommandMessage::GetRoster(send)
.handle_online(
writer.clone(),
@@ -134,7 +142,9 @@ impl Luz {
self.pending_iqs.clone(),
)
.await;
+ debug!("sent roster req");
let roster = recv.await;
+ debug!("got roster");
match roster {
Ok(r) => {
match r {
@@ -371,9 +381,11 @@ impl CommandMessage {
CommandMessage::GetRoster(result_sender) => {
// TODO: jid resource should probably be stored within the connection
let owned_jid: JID;
+ debug!("before client_jid lock");
{
owned_jid = client_jid.lock().await.clone();
}
+ debug!("after client_jid lock");
let iq_id = Uuid::new_v4().to_string();
let (send, iq_recv) = oneshot::channel();
{
@@ -1062,6 +1074,7 @@ pub enum UpdateMessage {
FullRoster(Vec<Contact>),
/// (only update app roster state, don't replace)
RosterUpdate(Contact),
+ RosterDelete(JID),
/// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
Presence {
from: JID,
@@ -1073,4 +1086,5 @@ pub enum UpdateMessage {
to: JID,
message: Message,
},
+ SubscriptionRequest(jid::JID),
}
diff --git a/luz/src/main.rs b/luz/src/main.rs
index 5e9cd13..9779351 100644
--- a/luz/src/main.rs
+++ b/luz/src/main.rs
@@ -1,8 +1,13 @@
-use std::time::Duration;
+use std::{str::FromStr, time::Duration};
+use jid::JID;
use luz::{CommandMessage, LuzHandle};
use sqlx::SqlitePool;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::{
+ io::{AsyncReadExt, AsyncWriteExt},
+ sync::oneshot,
+};
+use tracing::info;
#[tokio::main]
async fn main() {
@@ -13,10 +18,23 @@ async fn main() {
tokio::spawn(async move {
while let Some(msg) = recv.recv().await {
- println!("{:#?}", msg)
+ info!("{:#?}", msg)
}
});
luz.send(CommandMessage::Connect).await.unwrap();
- tokio::time::sleep(Duration::from_secs(15)).await;
+ let (send, recv) = oneshot::channel();
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ info!("sending message");
+ luz.send(CommandMessage::SendMessage(
+ JID::from_str("cel@blos.sm").unwrap(),
+ luz::chat::Body {
+ body: "hallo!!!".to_string(),
+ },
+ send,
+ ))
+ .await
+ .unwrap();
+ recv.await.unwrap().unwrap();
+ println!("sent message");
}
diff --git a/luz/src/presence.rs b/luz/src/presence.rs
index 563121b..1df20a7 100644
--- a/luz/src/presence.rs
+++ b/luz/src/presence.rs
@@ -6,7 +6,7 @@ pub struct Online {
pub show: Option<Show>,
pub status: Option<String>,
#[sqlx(skip)]
- priority: Option<i8>,
+ pub priority: Option<i8>,
}
#[derive(Debug, Clone, Copy)]
@@ -55,7 +55,7 @@ impl sqlx::Encode<'_, Sqlite> for Show {
#[derive(Debug, Default)]
pub struct Offline {
- status: Option<String>,
+ pub status: Option<String>,
}
#[derive(Debug)]
diff --git a/luz/src/roster.rs b/luz/src/roster.rs
index e3db00f..0e43a8a 100644
--- a/luz/src/roster.rs
+++ b/luz/src/roster.rs
@@ -27,6 +27,7 @@ pub enum Subscription {
None,
PendingOut,
PendingIn,
+ PendingInPendingOut,
OnlyOut,
OnlyIn,
OutPendingIn,
@@ -51,6 +52,7 @@ impl sqlx::Decode<'_, Sqlite> for Subscription {
"none" => Ok(Self::None),
"pending-out" => Ok(Self::PendingOut),
"pending-in" => Ok(Self::PendingIn),
+ "pending-in-pending-out" => Ok(Self::PendingInPendingOut),
"only-out" => Ok(Self::OnlyOut),
"only-in" => Ok(Self::OnlyIn),
"out-pending-in" => Ok(Self::OutPendingIn),
@@ -70,6 +72,7 @@ impl sqlx::Encode<'_, Sqlite> for Subscription {
Subscription::None => "none",
Subscription::PendingOut => "pending-out",
Subscription::PendingIn => "pending-in",
+ Subscription::PendingInPendingOut => "pending-in-pending-out",
Subscription::OnlyOut => "only-out",
Subscription::OnlyIn => "only-in",
Subscription::OutPendingIn => "out-pending-in",