aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 19:13:10 +0000
committerLibravatar cel 🌸 <cel@bunny.garden>2025-03-26 19:13:10 +0000
commit8c239e5c7a49cff350104b09cbb74d862c2ec420 (patch)
tree4b392f1ffa6b91fadf68b4a7f67ad5f901fbeda4
parent410fe3af16be5985c868b00908b8ddf4ed6e469d (diff)
downloadluz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.gz
luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.tar.bz2
luz-8c239e5c7a49cff350104b09cbb74d862c2ec420.zip
feat: stream error handling
-rw-r--r--filamento/examples/example.rs2
-rw-r--r--filamento/src/error.rs6
-rw-r--r--filamento/src/lib.rs74
-rw-r--r--filamento/src/logic/connection_error.rs7
-rw-r--r--filamento/src/logic/mod.rs40
-rw-r--r--filamento/src/logic/process_stanza.rs421
-rw-r--r--lampada/src/connection/mod.rs52
-rw-r--r--lampada/src/connection/read.rs60
-rw-r--r--lampada/src/connection/write.rs14
-rw-r--r--lampada/src/lib.rs7
-rw-r--r--stanza/src/client/mod.rs24
-rw-r--r--stanza/src/stream.rs4
12 files changed, 369 insertions, 342 deletions
diff --git a/filamento/examples/example.rs b/filamento/examples/example.rs
index f2b787b..e36968d 100644
--- a/filamento/examples/example.rs
+++ b/filamento/examples/example.rs
@@ -7,7 +7,7 @@ use tracing::info;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
- let db = Db::create_connect_and_migrate(Path::new("./filamentoa.db"))
+ let db = Db::create_connect_and_migrate(Path::new("./filamento.db"))
.await
.unwrap();
let (client, mut recv) =
diff --git a/filamento/src/error.rs b/filamento/src/error.rs
index 996a503..c5fdb03 100644
--- a/filamento/src/error.rs
+++ b/filamento/src/error.rs
@@ -16,7 +16,7 @@ pub enum Error {
// TODO: include content
// UnrecognizedContent(peanuts::element::Content),
#[error("iq receive error: {0}")]
- Iq(IqError),
+ Iq(#[from] IqError),
// TODO: change to Connecting(ConnectingError)
#[error("connecting: {0}")]
Connecting(#[from] ConnectionJobError),
@@ -32,7 +32,7 @@ pub enum Error {
#[error("message send error: {0}")]
MessageSend(MessageSendError),
#[error("message receive error: {0}")]
- MessageRecv(MessageRecvError),
+ MessageRecv(#[from] MessageRecvError),
}
#[derive(Debug, Error, Clone)]
@@ -86,7 +86,7 @@ pub enum RosterError {
#[derive(Debug, Error, Clone)]
#[error("database error: {0}")]
-pub struct DatabaseError(Arc<sqlx::Error>);
+pub struct DatabaseError(pub Arc<sqlx::Error>);
impl From<sqlx::Error> for DatabaseError {
fn from(e: sqlx::Error) -> Self {
diff --git a/filamento/src/lib.rs b/filamento/src/lib.rs
index 030dc43..b284c7e 100644
--- a/filamento/src/lib.rs
+++ b/filamento/src/lib.rs
@@ -98,6 +98,33 @@ pub enum Command {
/// chatroom). if disconnected, will be cached so when client connects, message will be sent.
SendMessage(JID, Body, oneshot::Sender<Result<(), WriteError>>),
}
+
+#[derive(Debug, Clone)]
+pub enum UpdateMessage {
+ Error(Error),
+ Online(Online, Vec<Contact>),
+ Offline(Offline),
+ /// received roster from jabber server (replace full app roster state with this)
+ /// is this needed?
+ FullRoster(Vec<Contact>),
+ /// (only update app roster state, don't replace)
+ RosterUpdate(Contact),
+ RosterDelete(JID),
+ /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
+ Presence {
+ from: JID,
+ presence: Presence,
+ },
+ // TODO: receipts
+ // MessageDispatched(Uuid),
+ Message {
+ to: JID,
+ message: Message,
+ },
+ SubscriptionRequest(jid::JID),
+ Unsupported(Stanza),
+}
+
/// an xmpp client that is suited for a chat client use case
#[derive(Debug)]
pub struct Client {
@@ -147,20 +174,24 @@ impl Client {
let (_sup_send, sup_recv) = oneshot::channel();
let sup_recv = sup_recv.fuse();
- let logic = ClientLogic::new(db, Arc::new(Mutex::new(HashMap::new())), update_send);
+ let client = Self {
+ sender: command_sender,
+ // TODO: configure timeout
+ timeout: Duration::from_secs(10),
+ };
+
+ let logic = ClientLogic::new(
+ client.clone(),
+ db,
+ Arc::new(Mutex::new(HashMap::new())),
+ update_send,
+ );
let actor: CoreClient<ClientLogic> =
CoreClient::new(jid, password, command_receiver, None, sup_recv, logic);
tokio::spawn(async move { actor.run().await });
- (
- Self {
- sender: command_sender,
- // TODO: configure timeout
- timeout: Duration::from_secs(10),
- },
- update_recv,
- )
+ (client, update_recv)
}
pub async fn get_roster(&self) -> Result<Vec<Contact>, CommandError<RosterError>> {
@@ -453,28 +484,3 @@ impl From<Command> for CoreClientCommand<Command> {
CoreClientCommand::Command(value)
}
}
-
-#[derive(Debug, Clone)]
-pub enum UpdateMessage {
- Error(Error),
- Online(Online, Vec<Contact>),
- Offline(Offline),
- /// received roster from jabber server (replace full app roster state with this)
- /// is this needed?
- FullRoster(Vec<Contact>),
- /// (only update app roster state, don't replace)
- RosterUpdate(Contact),
- RosterDelete(JID),
- /// presences should be stored with users in the ui, not contacts, as presences can be received from anyone
- Presence {
- from: JID,
- presence: Presence,
- },
- // TODO: receipts
- // MessageDispatched(Uuid),
- Message {
- to: JID,
- message: Message,
- },
- SubscriptionRequest(jid::JID),
-}
diff --git a/filamento/src/logic/connection_error.rs b/filamento/src/logic/connection_error.rs
index ac9e931..081900b 100644
--- a/filamento/src/logic/connection_error.rs
+++ b/filamento/src/logic/connection_error.rs
@@ -1,12 +1,7 @@
use lampada::error::ConnectionError;
-use crate::UpdateMessage;
-
use super::ClientLogic;
pub async fn handle_connection_error(logic: ClientLogic, error: ConnectionError) {
- logic
- .update_sender()
- .send(UpdateMessage::Error(error.into()))
- .await;
+ logic.handle_error(error.into()).await;
}
diff --git a/filamento/src/logic/mod.rs b/filamento/src/logic/mod.rs
index 638f682..365a0df 100644
--- a/filamento/src/logic/mod.rs
+++ b/filamento/src/logic/mod.rs
@@ -3,8 +3,9 @@ use std::{collections::HashMap, sync::Arc};
use lampada::{Logic, error::ReadError};
use stanza::client::Stanza;
use tokio::sync::{Mutex, mpsc, oneshot};
+use tracing::{error, info, warn};
-use crate::{Command, UpdateMessage, db::Db};
+use crate::{Client, Command, UpdateMessage, db::Db, error::Error};
mod abort;
mod connect;
@@ -16,6 +17,7 @@ mod process_stanza;
#[derive(Clone)]
pub struct ClientLogic {
+ client: Client,
db: Db,
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -23,6 +25,7 @@ pub struct ClientLogic {
impl ClientLogic {
pub fn new(
+ client: Client,
db: Db,
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Result<Stanza, ReadError>>>>>,
update_sender: mpsc::Sender<UpdateMessage>,
@@ -31,9 +34,14 @@ impl ClientLogic {
db,
pending,
update_sender,
+ client,
}
}
+ pub fn client(&self) -> &Client {
+ &self.client
+ }
+
pub fn db(&self) -> &Db {
&self.db
}
@@ -45,6 +53,23 @@ impl ClientLogic {
pub fn update_sender(&self) -> &mpsc::Sender<UpdateMessage> {
&self.update_sender
}
+
+ pub async fn handle_unsupported(&self, stanza: impl Into<Stanza>) {
+ let stanza: Stanza = stanza.into();
+ warn!("received unsupported stanza: {:?}", stanza);
+ self.handle_update(UpdateMessage::Unsupported(stanza)).await;
+ }
+
+ pub async fn handle_update(&self, update: UpdateMessage) {
+ // TODO: impl fmt
+ info!("{:?}", update);
+ self.update_sender().send(update).await;
+ }
+
+ pub async fn handle_error(&self, e: Error) {
+ error!("{}", e);
+ self.handle_update(UpdateMessage::Error(e)).await;
+ }
}
impl Logic for ClientLogic {
@@ -63,13 +88,8 @@ impl Logic for ClientLogic {
disconnect::handle_disconnect(self, connection).await;
}
- async fn handle_stanza(
- self,
- stanza: ::stanza::client::Stanza,
- connection: lampada::Connected,
- supervisor: lampada::SupervisorSender,
- ) {
- process_stanza::handle_stanza(self, stanza, connection, supervisor).await;
+ async fn handle_stanza(self, stanza: ::stanza::client::Stanza, connection: lampada::Connected) {
+ process_stanza::handle_stanza(self, stanza, connection).await;
}
async fn handle_online(self, command: Self::Cmd, connection: lampada::Connected) {
@@ -87,4 +107,8 @@ impl Logic for ClientLogic {
async fn handle_connection_error(self, error: lampada::error::ConnectionError) {
connection_error::handle_connection_error(self, error).await;
}
+
+ async fn handle_stream_error(self, stream_error: stanza::stream::Error) {
+ self.handle_error(Error::Stream(stream_error)).await;
+ }
}
diff --git a/filamento/src/logic/process_stanza.rs b/filamento/src/logic/process_stanza.rs
index 17738df..1a68936 100644
--- a/filamento/src/logic/process_stanza.rs
+++ b/filamento/src/logic/process_stanza.rs
@@ -2,263 +2,238 @@ use std::str::FromStr;
use chrono::Utc;
use lampada::{Connected, SupervisorSender};
-use stanza::client::Stanza;
+use stanza::client::{Stanza, iq::Iq};
use uuid::Uuid;
use crate::{
UpdateMessage,
chat::{Body, Message},
- error::{Error, IqError, MessageRecvError, PresenceError, RosterError},
+ error::{DatabaseError, Error, IqError, MessageRecvError, PresenceError, RosterError},
presence::{Offline, Online, Presence, PresenceType, Show},
roster::Contact,
};
use super::ClientLogic;
-pub async fn handle_stanza(
+pub async fn handle_stanza(logic: ClientLogic, stanza: Stanza, connection: Connected) {
+ let result = process_stanza(logic.clone(), stanza, connection).await;
+ match result {
+ Ok(u) => match u {
+ Some(UpdateMessage::Unsupported(stanza)) => logic.handle_unsupported(stanza).await,
+ _ => {
+ if let Some(u) = u {
+ logic.handle_update(u).await
+ }
+ }
+ },
+ Err(e) => logic.handle_error(e).await,
+ }
+}
+
+pub async fn recv_message(
logic: ClientLogic,
- stanza: Stanza,
- connection: Connected,
- supervisor: SupervisorSender,
-) {
- match stanza {
- Stanza::Message(stanza_message) => {
- if let Some(mut from) = stanza_message.from {
- // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
- let timestamp = stanza_message
+ stanza_message: stanza::client::message::Message,
+) -> Result<Option<UpdateMessage>, MessageRecvError> {
+ if let Some(mut from) = stanza_message.from {
+ // TODO: don't ignore delay from. xep says SHOULD send error if incorrect.
+ let timestamp = stanza_message
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ // TODO: group chat messages
+ let mut message = Message {
+ id: stanza_message
+ .id
+ // TODO: proper id storage
+ .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
+ .unwrap_or_else(|| Uuid::new_v4()),
+ from: from.clone(),
+ timestamp,
+ body: Body {
+ // TODO: should this be an option?
+ body: stanza_message
+ .body
+ .map(|body| body.body)
+ .unwrap_or_default()
+ .unwrap_or_default(),
+ },
+ };
+ // TODO: can this be more efficient?
+ logic
+ .db()
+ .create_message_with_user_resource_and_chat(message.clone(), from.clone())
+ .await
+ .map_err(|e| DatabaseError(e.into()))?;
+ message.from = message.from.as_bare();
+ from = from.as_bare();
+ Ok(Some(UpdateMessage::Message { to: from, message }))
+ } else {
+ Err(MessageRecvError::MissingFrom)
+ }
+}
+
+pub async fn recv_presence(
+ presence: stanza::client::presence::Presence,
+) -> Result<Option<UpdateMessage>, PresenceError> {
+ if let Some(from) = presence.from {
+ match presence.r#type {
+ Some(r#type) => match r#type {
+ // error processing a presence from somebody
+ stanza::client::presence::PresenceType::Error => {
+ // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
+ // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
+ Err(PresenceError::StanzaError(
+ presence
+ .errors
+ .first()
+ .cloned()
+ .expect("error MUST have error"),
+ ))
+ }
+ // should not happen (error to server)
+ stanza::client::presence::PresenceType::Probe => {
+ // TODO: should probably write an error and restart stream
+ Err(PresenceError::Unsupported)
+ }
+ stanza::client::presence::PresenceType::Subscribe => {
+ // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
+ Ok(Some(UpdateMessage::SubscriptionRequest(from)))
+ }
+ stanza::client::presence::PresenceType::Unavailable => {
+ let offline = Offline {
+ status: presence.status.map(|status| status.status.0),
+ };
+ let timestamp = presence
+ .delay
+ .map(|delay| delay.stamp)
+ .unwrap_or_else(|| Utc::now());
+ Ok(Some(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Offline(offline),
+ },
+ }))
+ }
+ // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
+ stanza::client::presence::PresenceType::Subscribed => Ok(None),
+ stanza::client::presence::PresenceType::Unsubscribe => Ok(None),
+ stanza::client::presence::PresenceType::Unsubscribed => Ok(None),
+ },
+ None => {
+ let online = Online {
+ show: presence.show.map(|show| match show {
+ stanza::client::presence::Show::Away => Show::Away,
+ stanza::client::presence::Show::Chat => Show::Chat,
+ stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
+ stanza::client::presence::Show::Xa => Show::ExtendedAway,
+ }),
+ status: presence.status.map(|status| status.status.0),
+ priority: presence.priority.map(|priority| priority.0),
+ };
+ let timestamp = presence
.delay
.map(|delay| delay.stamp)
.unwrap_or_else(|| Utc::now());
- // TODO: group chat messages
- let mut message = Message {
- id: stanza_message
- .id
- // TODO: proper id storage
- .map(|id| Uuid::from_str(&id).unwrap_or_else(|_| Uuid::new_v4()))
- .unwrap_or_else(|| Uuid::new_v4()),
- from: from.clone(),
- timestamp,
- body: Body {
- // TODO: should this be an option?
- body: stanza_message
- .body
- .map(|body| body.body)
- .unwrap_or_default()
- .unwrap_or_default(),
+ Ok(Some(UpdateMessage::Presence {
+ from,
+ presence: Presence {
+ timestamp,
+ presence: PresenceType::Online(online),
},
- };
- // TODO: can this be more efficient?
- let result = logic
- .db()
- .create_message_with_user_resource_and_chat(message.clone(), from.clone())
- .await;
- if let Err(e) = result {
- tracing::error!("messagecreate");
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MessageHistory(e.into()),
- )))
- .await;
- }
- message.from = message.from.as_bare();
- from = from.as_bare();
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Message { to: from, message })
- .await;
- } else {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::MessageRecv(
- MessageRecvError::MissingFrom,
- )))
- .await;
+ }))
}
}
- Stanza::Presence(presence) => {
- if let Some(from) = presence.from {
- match presence.r#type {
- Some(r#type) => match r#type {
- // error processing a presence from somebody
- stanza::client::presence::PresenceType::Error => {
- // TODO: is there any other information that should go with the error? also MUST have an error, otherwise it's a different error. maybe it shoulnd't be an option.
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Presence(
- // TODO: ughhhhhhhhhhhhh these stanza errors should probably just have an option, and custom display
- PresenceError::StanzaError(
- presence
- .errors
- .first()
- .cloned()
- .expect("error MUST have error"),
- ),
- )))
- .await;
- }
- // should not happen (error to server)
- stanza::client::presence::PresenceType::Probe => {
- // TODO: should probably write an error and restart stream
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::Unsupported,
- )))
- .await;
- }
- stanza::client::presence::PresenceType::Subscribe => {
- // may get a subscription request from somebody who is not a contact!!! therefore should be its own kind of event
- let _ = logic
- .update_sender()
- .send(UpdateMessage::SubscriptionRequest(from))
- .await;
- }
- stanza::client::presence::PresenceType::Unavailable => {
- let offline = Offline {
- status: presence.status.map(|status| status.status.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Offline(offline),
- },
- })
- .await;
- }
- // for now, do nothing, as these are simply informational. will receive roster push from the server regarding the changes to do with them.
- stanza::client::presence::PresenceType::Subscribed => {}
- stanza::client::presence::PresenceType::Unsubscribe => {}
- stanza::client::presence::PresenceType::Unsubscribed => {}
- },
- None => {
- let online = Online {
- show: presence.show.map(|show| match show {
- stanza::client::presence::Show::Away => Show::Away,
- stanza::client::presence::Show::Chat => Show::Chat,
- stanza::client::presence::Show::Dnd => Show::DoNotDisturb,
- stanza::client::presence::Show::Xa => Show::ExtendedAway,
- }),
- status: presence.status.map(|status| status.status.0),
- priority: presence.priority.map(|priority| priority.0),
- };
- let timestamp = presence
- .delay
- .map(|delay| delay.stamp)
- .unwrap_or_else(|| Utc::now());
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Presence {
- from,
- presence: Presence {
- timestamp,
- presence: PresenceType::Online(online),
- },
- })
- .await;
- }
- }
+ } else {
+ Err(PresenceError::MissingFrom)
+ }
+}
+
+pub async fn recv_iq(logic: ClientLogic, iq: Iq) -> Result<Option<UpdateMessage>, IqError> {
+ match iq.r#type {
+ stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
+ let send;
+ {
+ send = logic.pending().lock().await.remove(&iq.id);
+ }
+ if let Some(send) = send {
+ send.send(Ok(Stanza::Iq(iq)));
+ Ok(None)
} else {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Presence(
- PresenceError::MissingFrom,
- )))
- .await;
+ Err(IqError::NoMatchingId(iq.id))
}
}
- Stanza::Iq(iq) => match iq.r#type {
- stanza::client::iq::IqType::Error | stanza::client::iq::IqType::Result => {
- let send;
- {
- send = logic.pending().lock().await.remove(&iq.id);
- }
- if let Some(send) = send {
- send.send(Ok(Stanza::Iq(iq)));
- } else {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Iq(IqError::NoMatchingId(
- iq.id,
- ))))
- .await;
- }
- }
- // TODO: send unsupported to server
- // TODO: proper errors i am so tired please
- stanza::client::iq::IqType::Get => {}
- stanza::client::iq::IqType::Set => {
- if let Some(query) = iq.query {
- match query {
- stanza::client::iq::Query::Roster(mut query) => {
- // TODO: there should only be one
- if let Some(item) = query.items.pop() {
- match item.subscription {
- Some(stanza::roster::Subscription::Remove) => {
- logic.db().delete_contact(item.jid.clone()).await;
- logic
- .update_sender()
- .send(UpdateMessage::RosterDelete(item.jid))
- .await;
- // TODO: send result
- }
- _ => {
- let contact: Contact = item.into();
- if let Err(e) =
- logic.db().upsert_contact(contact.clone()).await
- {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Roster(
- RosterError::Cache(e.into()),
- )))
- .await;
- }
+ // TODO: send unsupported to server
+ // TODO: proper errors i am so tired please
+ stanza::client::iq::IqType::Get => Ok(None),
+ stanza::client::iq::IqType::Set => {
+ if let Some(query) = iq.query {
+ match query {
+ stanza::client::iq::Query::Roster(mut query) => {
+ // TODO: there should only be one
+ if let Some(item) = query.items.pop() {
+ match item.subscription {
+ Some(stanza::roster::Subscription::Remove) => {
+ logic.db().delete_contact(item.jid.clone()).await;
+ Ok(Some(UpdateMessage::RosterDelete(item.jid)))
+ }
+ _ => {
+ let contact: Contact = item.into();
+ if let Err(e) = logic.db().upsert_contact(contact.clone()).await
+ {
let _ = logic
.update_sender()
- .send(UpdateMessage::RosterUpdate(contact))
+ .send(UpdateMessage::Error(Error::Roster(
+ RosterError::Cache(e.into()),
+ )))
.await;
- // TODO: send result
- // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
- // from: ,
- // id: todo!(),
- // to: todo!(),
- // r#type: todo!(),
- // lang: todo!(),
- // query: todo!(),
- // errors: todo!(),
- // }));
}
+ Ok(Some(UpdateMessage::RosterUpdate(contact)))
+ // TODO: send result
+ // write_handle.write(Stanza::Iq(stanza::client::iq::Iq {
+ // from: ,
+ // id: todo!(),
+ // to: todo!(),
+ // r#type: todo!(),
+ // lang: todo!(),
+ // query: todo!(),
+ // errors: todo!(),
+ // }));
}
}
+ } else {
+ Ok(None)
}
- // TODO: send unsupported to server
- _ => {}
}
- } else {
- // TODO: send error (unsupported) to server
+ // TODO: send unsupported to server
+ _ => Ok(None),
}
+ } else {
+ // TODO: send error (unsupported) to server
+ Ok(None)
}
- },
+ }
+ }
+}
+
+pub async fn process_stanza(
+ logic: ClientLogic,
+ stanza: Stanza,
+ connection: Connected,
+) -> Result<Option<UpdateMessage>, Error> {
+ let update = match stanza {
+ Stanza::Message(stanza_message) => Ok(recv_message(logic, stanza_message).await?),
+ Stanza::Presence(presence) => Ok(recv_presence(presence).await?),
+ Stanza::Iq(iq) => Ok(recv_iq(logic, iq).await?),
+ // unreachable, always caught by lampada
+ // TODO: make cleaner
Stanza::Error(error) => {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::Stream(error)))
- .await;
- // TODO: reconnect
+ unreachable!()
}
Stanza::OtherContent(content) => {
- let _ = logic
- .update_sender()
- .send(UpdateMessage::Error(Error::UnrecognizedContent));
+ Err(Error::UnrecognizedContent)
// TODO: send error to write_thread
}
- }
+ };
+ update
}
diff --git a/lampada/src/connection/mod.rs b/lampada/src/connection/mod.rs
index 1e767b0..ffaa7a7 100644
--- a/lampada/src/connection/mod.rs
+++ b/lampada/src/connection/mod.rs
@@ -10,7 +10,7 @@ use std::{
use jid::JID;
use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberStream};
use read::{ReadControl, ReadControlHandle, ReadState};
-use stanza::client::Stanza;
+use stanza::{client::Stanza, stream_error::Error as StreamError};
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
@@ -28,7 +28,7 @@ pub(crate) mod write;
pub struct Supervisor<Lgc> {
command_recv: mpsc::Receiver<SupervisorCommand>,
- reader_crash: oneshot::Receiver<ReadState>,
+ reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
read_control_handle: ReadControlHandle,
write_control_handle: WriteControlHandle,
@@ -43,18 +43,13 @@ pub enum SupervisorCommand {
Disconnect,
// for if there was a stream error, require to reconnect
// couldn't stream errors just cause a crash? lol
- Reconnect(ChildState),
-}
-
-pub enum ChildState {
- Write(WriteState),
- Read(ReadState),
+ Reconnect(ReadState),
}
impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
fn new(
command_recv: mpsc::Receiver<SupervisorCommand>,
- reader_crash: oneshot::Receiver<ReadState>,
+ reader_crash: oneshot::Receiver<(Option<StreamError>, ReadState)>,
writer_crash: oneshot::Receiver<(WriteMessage, WriteState)>,
read_control_handle: ReadControlHandle,
write_control_handle: WriteControlHandle,
@@ -104,33 +99,19 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
break;
},
// TODO: Reconnect without aborting, gentle reconnect.
+ // the server sent a stream error
SupervisorCommand::Reconnect(state) => {
// TODO: please omfg
// send abort to read stream, as already done, consider
let (read_state, mut write_state);
- match state {
- ChildState::Write(receiver) => {
- write_state = receiver;
- let (send, recv) = oneshot::channel();
- let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
- // TODO: need a tokio select, in case the state arrives from somewhere else
- if let Ok(state) = recv.await {
- read_state = state;
- } else {
- break
- }
- },
- ChildState::Read(read) => {
- read_state = read;
- let (send, recv) = oneshot::channel();
- let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
- // TODO: need a tokio select, in case the state arrives from somewhere else
- if let Ok(state) = recv.await {
- write_state = state;
- } else {
- break
- }
- },
+ read_state = state;
+ let (send, recv) = oneshot::channel();
+ let _ = self.write_control_handle.send(WriteControl::Abort(None, send)).await;
+ // TODO: need a tokio select, in case the state arrives from somewhere else
+ if let Ok(state) = recv.await {
+ write_state = state;
+ } else {
+ break
}
let mut jid = self.connected.jid.clone();
@@ -175,7 +156,8 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
let _ = self.read_control_handle.send(ReadControl::Abort(send)).await;
let read_state = tokio::select! {
Ok(s) = recv => s,
- Ok(s) = &mut self.reader_crash => s,
+ // TODO: is this okay
+ Ok(s) = &mut self.reader_crash => s.1,
// in case, just break as irrecoverable
else => break,
};
@@ -215,9 +197,9 @@ impl<Lgc: Logic + Clone + Send + 'static> Supervisor<Lgc> {
},
}
},
- Ok(read_state) = &mut self.reader_crash => {
+ Ok((stream_error, read_state)) = &mut self.reader_crash => {
let (send, recv) = oneshot::channel();
- let _ = self.write_control_handle.send(WriteControl::Abort(send)).await;
+ let _ = self.write_control_handle.send(WriteControl::Abort(stream_error, send)).await;
let (retry_msg, mut write_state) = tokio::select! {
Ok(s) = recv => (None, s),
Ok(s) = &mut self.writer_crash => (Some(s.0), s.1),
diff --git a/lampada/src/connection/read.rs b/lampada/src/connection/read.rs
index cc69387..640ca8e 100644
--- a/lampada/src/connection/read.rs
+++ b/lampada/src/connection/read.rs
@@ -9,13 +9,15 @@ use std::{
use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberReader};
use stanza::client::Stanza;
+use stanza::stream::Error as StreamErrorStanza;
+use stanza::stream_error::Error as StreamError;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::{JoinHandle, JoinSet},
};
use tracing::info;
-use crate::{Connected, Logic};
+use crate::{Connected, Logic, WriteMessage};
use super::{write::WriteHandle, SupervisorCommand, SupervisorSender};
@@ -36,7 +38,7 @@ pub struct Read<Lgc> {
// control stuff
control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
}
/// when a crash/abort occurs, this gets sent back to the supervisor, so that the connection session can continue
@@ -54,7 +56,7 @@ impl<Lgc> Read<Lgc> {
logic: Lgc,
supervisor_control: SupervisorSender,
control_receiver: mpsc::Receiver<ReadControl>,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
) -> Self {
let (_send, recv) = oneshot::channel();
Self {
@@ -106,34 +108,40 @@ impl<Lgc: Clone + Logic + Send + 'static> Read<Lgc> {
println!("read stanza");
match s {
Ok(s) => {
- self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone(), self.supervisor_control.clone()));
+ match s {
+ Stanza::Error(error) => {
+ self.logic.clone().handle_stream_error(error).await;
+ self.supervisor_control.send(SupervisorCommand::Reconnect(ReadState { supervisor_control: self.supervisor_control.clone(), tasks: self.tasks })).await;
+ break;
+ },
+ _ => {
+ self.tasks.spawn(self.logic.clone().handle_stanza(s, self.connected.clone()));
+ }
+ };
},
Err(e) => {
println!("error: {:?}", e);
- // TODO: NEXT write the correct error stanza depending on error, decide whether to reconnect or properly disconnect, depending on if disconnecting is true
- // match e {
- // peanuts::Error::ReadError(error) => todo!(),
- // peanuts::Error::Utf8Error(utf8_error) => todo!(),
- // peanuts::Error::ParseError(_) => todo!(),
- // peanuts::Error::EntityProcessError(_) => todo!(),
- // peanuts::Error::InvalidCharRef(_) => todo!(),
- // peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => todo!(),
- // peanuts::Error::DuplicateAttribute(_) => todo!(),
- // peanuts::Error::UnqualifiedNamespace(_) => todo!(),
- // peanuts::Error::MismatchedEndTag(name, name1) => todo!(),
- // peanuts::Error::NotInElement(_) => todo!(),
- // peanuts::Error::ExtraData(_) => todo!(),
- // peanuts::Error::UndeclaredNamespace(_) => todo!(),
- // peanuts::Error::IncorrectName(name) => todo!(),
- // peanuts::Error::DeserializeError(_) => todo!(),
- // peanuts::Error::Deserialize(deserialize_error) => todo!(),
- // peanuts::Error::RootElementEnded => todo!(),
- // }
// TODO: make sure this only happens when an end tag is received
if self.disconnecting == true {
break;
} else {
- let _ = self.on_crash.send(ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks });
+ let stream_error = match e {
+ peanuts::Error::ReadError(error) => None,
+ peanuts::Error::Utf8Error(utf8_error) => Some(StreamError::UnsupportedEncoding),
+ peanuts::Error::ParseError(_) => Some(StreamError::BadFormat),
+ peanuts::Error::EntityProcessError(_) => Some(StreamError::RestrictedXml),
+ peanuts::Error::InvalidCharRef(char_ref_error) => Some(StreamError::UnsupportedEncoding),
+ peanuts::Error::DuplicateNameSpaceDeclaration(namespace_declaration) => Some(StreamError::NotWellFormed),
+ peanuts::Error::DuplicateAttribute(_) => Some(StreamError::NotWellFormed),
+ peanuts::Error::MismatchedEndTag(name, name1) => Some(StreamError::NotWellFormed),
+ peanuts::Error::NotInElement(_) => Some(StreamError::InvalidXml),
+ peanuts::Error::ExtraData(_) => None,
+ peanuts::Error::UndeclaredNamespace(_) => Some(StreamError::InvalidNamespace),
+ peanuts::Error::Deserialize(deserialize_error) => Some(StreamError::InvalidXml),
+ peanuts::Error::RootElementEnded => Some(StreamError::InvalidXml),
+ };
+
+ let _ = self.on_crash.send((stream_error, ReadState { supervisor_control: self.supervisor_control, tasks: self.tasks }));
}
break;
},
@@ -183,7 +191,7 @@ impl ReadControlHandle {
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
@@ -210,7 +218,7 @@ impl ReadControlHandle {
connected: Connected,
logic: Lgc,
supervisor_control: SupervisorSender,
- on_crash: oneshot::Sender<ReadState>,
+ on_crash: oneshot::Sender<(Option<StreamError>, ReadState)>,
) -> Self {
let (control_sender, control_receiver) = mpsc::channel(20);
diff --git a/lampada/src/connection/write.rs b/lampada/src/connection/write.rs
index 8f0c34b..1070cdf 100644
--- a/lampada/src/connection/write.rs
+++ b/lampada/src/connection/write.rs
@@ -1,7 +1,9 @@
use std::ops::{Deref, DerefMut};
use luz::{connection::Tls, jabber_stream::bound_stream::BoundJabberWriter};
-use stanza::client::Stanza;
+use stanza::{
+ client::Stanza, stream::Error as StreamErrorStanza, stream_error::Error as StreamError,
+};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
@@ -34,7 +36,7 @@ pub struct WriteMessage {
pub enum WriteControl {
Disconnect,
- Abort(oneshot::Sender<WriteState>),
+ Abort(Option<StreamError>, oneshot::Sender<WriteState>),
}
impl Write {
@@ -119,7 +121,13 @@ impl Write {
break;
},
// in case of abort, stream is already fucked, just send the receiver ready for a reconnection at the same resource
- WriteControl::Abort(sender) => {
+ WriteControl::Abort(error, sender) => {
+ // write stream error message for server if there is one
+ if let Some(error) = error {
+ // TODO: timeouts for writing to stream
+ let _ = self.stream.write(&Stanza::Error(StreamErrorStanza { error, text: None })).await;
+ // don't care about result, if it sends it sends, otherwise stream is restarting anyway
+ }
let _ = sender.send(WriteState { stanza_recv: self.stanza_receiver });
break;
},
diff --git a/lampada/src/lib.rs b/lampada/src/lib.rs
index c61c596..a01ba06 100644
--- a/lampada/src/lib.rs
+++ b/lampada/src/lib.rs
@@ -15,6 +15,7 @@ use stanza::client::{
iq::{self, Iq, IqType},
Stanza,
};
+use stanza::stream::Error as StreamError;
use tokio::{
sync::{mpsc, oneshot, Mutex},
task::JoinSet,
@@ -59,12 +60,16 @@ pub trait Logic {
connection: Connected,
) -> impl std::future::Future<Output = ()> + Send;
+ fn handle_stream_error(
+ self,
+ stream_error: StreamError,
+ ) -> impl std::future::Future<Output = ()> + Send;
+
/// run to handle an incoming xmpp stanza
fn handle_stanza(
self,
stanza: Stanza,
connection: Connected,
- supervisor: SupervisorSender,
) -> impl std::future::Future<Output = ()> + std::marker::Send;
/// run to handle a command message when a connection is currently established
diff --git a/stanza/src/client/mod.rs b/stanza/src/client/mod.rs
index 3e690a7..a1b2de5 100644
--- a/stanza/src/client/mod.rs
+++ b/stanza/src/client/mod.rs
@@ -61,3 +61,27 @@ impl IntoContent for Stanza {
}
}
}
+
+impl From<Message> for Stanza {
+ fn from(value: Message) -> Self {
+ Stanza::Message(value)
+ }
+}
+
+impl From<Presence> for Stanza {
+ fn from(value: Presence) -> Self {
+ Stanza::Presence(value)
+ }
+}
+
+impl From<Iq> for Stanza {
+ fn from(value: Iq) -> Self {
+ Stanza::Iq(value)
+ }
+}
+
+impl From<StreamError> for Stanza {
+ fn from(value: StreamError) -> Self {
+ Stanza::Error(value)
+ }
+}
diff --git a/stanza/src/stream.rs b/stanza/src/stream.rs
index 8e1982f..732a826 100644
--- a/stanza/src/stream.rs
+++ b/stanza/src/stream.rs
@@ -181,8 +181,8 @@ impl FromElement for Feature {
#[derive(Error, Debug, Clone)]
pub struct Error {
- error: StreamError,
- text: Option<Text>,
+ pub error: StreamError,
+ pub text: Option<Text>,
}
impl Display for Error {