From f7d6bdf1ed1b9d1d688e04830433ec89a66ffa55 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 18 Oct 2024 23:02:59 +0200 Subject: [PATCH 01/13] #58: remove kill_sends --- src/gateway/establish_connection.rs | 20 ++++++-------- src/gateway/gateway_task.rs | 19 +++++-------- src/gateway/heartbeat.rs | 42 ++++++----------------------- src/gateway/types/mod.rs | 6 +---- 4 files changed, 23 insertions(+), 64 deletions(-) diff --git a/src/gateway/establish_connection.rs b/src/gateway/establish_connection.rs index 8b8e66b..0a0837c 100644 --- a/src/gateway/establish_connection.rs +++ b/src/gateway/establish_connection.rs @@ -42,8 +42,6 @@ struct State { config: Config, connected_users: ConnectedUsers, sequence_number: Arc>, - kill_send: Sender<()>, - kill_receive: tokio::sync::broadcast::Receiver<()>, /// Receiver for heartbeat messages. The `HeartbeatHandler` will receive messages from this channel. heartbeat_receive: tokio::sync::broadcast::Receiver, /// Sender for heartbeat messages. The main gateway task will send messages to this channel for the `HeartbeatHandler` to receive and handle. @@ -100,8 +98,6 @@ pub(super) async fn establish_connection( config: config.clone(), connected_users: connected_users.clone(), sequence_number: sequence_number.clone(), - kill_send: kill_send.clone(), - kill_receive: kill_receive.resubscribe(), heartbeat_receive: message_receive.resubscribe(), heartbeat_send: message_send.clone(), session_id_send: session_id_send.clone(), @@ -150,7 +146,11 @@ async fn finish_connecting( Ok(next) => next, Err(_) => { log::debug!(target: "symfonia::gateway::finish_connecting", "Encountered error when trying to receive message. Sending kill signal..."); - state.kill_send.send(()).expect("Failed to send kill_send"); + state + .connection + .kill_send + .send(()) + .expect("Failed to send kill_send"); return Err(GatewayError::Timeout.into()); } }; @@ -172,8 +172,6 @@ async fn finish_connecting( heartbeat_handler_handle = Some(tokio::spawn({ let mut heartbeat_handler = HeartbeatHandler::new( state.connection.clone(), - state.kill_receive.resubscribe(), - state.kill_send.clone(), state.heartbeat_receive.resubscribe(), state.sequence_number.clone(), state.session_id_receive.resubscribe(), @@ -205,6 +203,7 @@ async fn finish_connecting( Err(_) => { log::trace!(target: "symfonia::gateway::establish_connection::finish_connecting", "Failed to verify token"); state + .connection .kill_send .send(()) .expect("Failed to send kill signal"); @@ -217,8 +216,6 @@ async fn finish_connecting( let main_task_handle = tokio::spawn(gateway_task::gateway_task( state.connection.clone(), gateway_user.lock().await.inbox.resubscribe(), - state.kill_receive.resubscribe(), - state.kill_send.clone(), state.heartbeat_send.clone(), state.sequence_number.clone(), )); @@ -235,8 +232,6 @@ async fn finish_connecting( log::trace!(target: "symfonia::gateway::establish_connection::finish_connecting", "No heartbeat_handler yet. Creating one..."); let mut heartbeat_handler = HeartbeatHandler::new( state.connection.clone(), - state.kill_receive.resubscribe(), - state.kill_send.clone(), state.heartbeat_receive.resubscribe(), state.sequence_number.clone(), state.session_id_receive.resubscribe(), @@ -246,7 +241,6 @@ async fn finish_connecting( } }), }, - state.kill_send.clone(), &identify.event_data.token, state.sequence_number.clone(), ) @@ -256,6 +250,7 @@ async fn finish_connecting( Err(_) => { log::error!(target: "symfonia::gateway::establish_connection::finish_connecting", "Failed to send session_id to heartbeat handler"); state + .connection .kill_send .send(()) .expect("Failed to send kill signal"); @@ -289,6 +284,7 @@ async fn finish_connecting( .into(), })))?; state + .connection .kill_send .send(()) .expect("Failed to send kill signal"); diff --git a/src/gateway/gateway_task.rs b/src/gateway/gateway_task.rs index e2cde18..a2662f7 100644 --- a/src/gateway/gateway_task.rs +++ b/src/gateway/gateway_task.rs @@ -18,17 +18,11 @@ use super::{Event, GatewayClient, GatewayPayload}; pub(super) async fn gateway_task( mut connection: super::WebSocketConnection, mut inbox: tokio::sync::broadcast::Receiver, - mut kill_receive: tokio::sync::broadcast::Receiver<()>, - mut kill_send: tokio::sync::broadcast::Sender<()>, mut heartbeat_send: tokio::sync::broadcast::Sender, last_sequence_number: Arc>, ) { log::trace!(target: "symfonia::gateway::gateway_task", "Started a new gateway task!"); - let inbox_processor = tokio::spawn(process_inbox( - connection.clone(), - inbox.resubscribe(), - kill_receive.resubscribe(), - )); + let inbox_processor = tokio::spawn(process_inbox(connection.clone(), inbox.resubscribe())); /* Before we can respond to any gateway event we receive, we need to figure out what kind of event @@ -39,18 +33,18 @@ pub(super) async fn gateway_task( loop { tokio::select! { - _ = kill_receive.recv() => { + _ = connection.kill_receive.recv() => { return; }, message_result = connection.receiver.recv() => { match message_result { Ok(message_of_unknown_type) => { - let event = unwrap_event(Event::try_from(message_of_unknown_type), connection.clone(), kill_send.clone()); + let event = unwrap_event(Event::try_from(message_of_unknown_type), connection.clone(), connection.kill_send.clone()); // TODO: Handle event }, Err(error) => { connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4000), reason: "INTERNAL_SERVER_ERROR".into() }))); - kill_send.send(()).expect("Failed to send kill_send"); + connection.kill_send.send(()).expect("Failed to send kill_send"); return; }, } @@ -112,13 +106,12 @@ fn unwrap_event( } async fn process_inbox( - connection: super::WebSocketConnection, + mut connection: super::WebSocketConnection, mut inbox: tokio::sync::broadcast::Receiver, - mut kill_receive: tokio::sync::broadcast::Receiver<()>, ) { loop { tokio::select! { - _ = kill_receive.recv() => { + _ = connection.kill_receive.recv() => { return; } event = inbox.recv() => { diff --git a/src/gateway/heartbeat.rs b/src/gateway/heartbeat.rs index 2c21960..7164e56 100644 --- a/src/gateway/heartbeat.rs +++ b/src/gateway/heartbeat.rs @@ -20,8 +20,6 @@ static LATENCY_BUFFER: std::time::Duration = std::time::Duration::from_secs(5); pub(super) struct HeartbeatHandler { connection: WebSocketConnection, - kill_receive: tokio::sync::broadcast::Receiver<()>, - kill_send: tokio::sync::broadcast::Sender<()>, message_receive: tokio::sync::broadcast::Receiver, last_heartbeat: std::time::Instant, /// The current sequence number of the gateway connection. @@ -35,9 +33,7 @@ impl HeartbeatHandler { /// This method initializes a new heartbeat handler with the provided connection, kill signals, and message receiver. It sets up the internal state for tracking the last heartbeat time. /// /// # Parameters - /// - `connection`: A shared reference to a mutex-protected connection object. - /// - `kill_receive`: A channel receiver for signaling the shutdown of the heartbeat handler. - /// - `kill_send`: A channel sender for sending signals to shut down the heartbeat handler. + /// - `connection`: A shared connection object. /// - `message_receive`: An MPSC (Multiple Producer Single Consumer) channel receiver for receiving heartbeat messages. /// - `session_id_receive`: A oneshot channel receiver for receiving the session ID. The heartbeat handler may start /// running before an identify or resume message with a session ID is received, so this channel is used to wait for @@ -64,8 +60,6 @@ impl HeartbeatHandler { /// ``` pub(super) fn new( connection: WebSocketConnection, - kill_receive: tokio::sync::broadcast::Receiver<()>, - kill_send: tokio::sync::broadcast::Sender<()>, message_receive: tokio::sync::broadcast::Receiver, last_sequence_number: Arc>, session_id_receive: tokio::sync::broadcast::Receiver, @@ -73,8 +67,6 @@ impl HeartbeatHandler { trace!(target: "symfonia::gateway::heartbeat_handler", "New heartbeat handler created"); Self { connection, - kill_receive, - kill_send, message_receive, last_heartbeat: std::time::Instant::now(), sequence_number: last_sequence_number, @@ -101,25 +93,6 @@ impl HeartbeatHandler { /// initialization. The corresponding `kill_receive` can be used by other tasks to signal that /// the Gateway connection should be closed. In the context of symfonia, this is being done to /// close the [GatewayTask]. - /// - /// - /// ## Example - /// ```rust - /// use std::sync::Arc; - /// use tokio::sync::broadcast; - /// use tokio::sync::mpsc; - /// use chorus::types::GatewayHeartbeat; - /// use super::Connection; - /// use super::HeartbeatHandler; - /// - /// let connection = Arc::new(Mutex::new(Connection::new())); - /// let (kill_send, kill_receive) = broadcast::channel(1); - /// let (message_send, message_receive) = mpsc::channel(16); - /// - /// let mut handler = HeartbeatHandler::new(connection, kill_receive, kill_send, message_receive).await; - /// tokio::spawn(async move { - /// handler.run(); - /// }); /// ``` pub(super) async fn run(&mut self) { trace!(target: "symfonia::gateway::heartbeat_handler", "Heartbeat handler started"); @@ -133,7 +106,7 @@ impl HeartbeatHandler { // // I would consider "way off" to be a difference of more than or equal to 3. tokio::select! { - _ = self.kill_receive.recv() => { + _ = self.connection.kill_receive.recv() => { trace!("Received kill signal in heartbeat_handler. Stopping heartbeat handler"); break; } @@ -157,10 +130,10 @@ impl HeartbeatHandler { Ok(_) => (), Err(e) => { trace!("Failed to send reconnect message in heartbeat_handler. Stopping gateway_task and heartbeat_handler"); - self.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); + self.connection.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); } }; - self.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); + self.connection.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); return; } } @@ -172,7 +145,7 @@ impl HeartbeatHandler { Ok(_) => (), Err(_) => { trace!("Failed to send heartbeat ack in heartbeat_handler. Stopping gateway_task and heartbeat_handler"); - self.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); + self.connection.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); }, } @@ -184,7 +157,7 @@ impl HeartbeatHandler { let elapsed = std::time::Instant::now() - self.last_heartbeat; if elapsed > std::time::Duration::from_secs(45) { trace!("Heartbeat timed out in heartbeat_handler. Stopping gateway_task and heartbeat_handler"); - self.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler");; + self.connection.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler");; break; } } @@ -211,7 +184,8 @@ impl HeartbeatHandler { Ok(_) => (), Err(_) => { trace!("Failed to send heartbeat ack in heartbeat_handler. Stopping gateway_task and heartbeat_handler"); - self.kill_send + self.connection + .kill_send .send(()) .expect("Failed to send kill signal in heartbeat_handler"); } diff --git a/src/gateway/types/mod.rs b/src/gateway/types/mod.rs index d3784dc..d2acef3 100644 --- a/src/gateway/types/mod.rs +++ b/src/gateway/types/mod.rs @@ -147,8 +147,6 @@ pub struct GatewayClient { main_task_handle: tokio::task::JoinHandle<()>, // Handle to the heartbeat task for this client heartbeat_task_handle: tokio::task::JoinHandle<()>, - // Kill switch to disconnect the client - pub kill_send: tokio::sync::broadcast::Sender<()>, /// Token of the session token used for this connection pub session_token: String, /// The last sequence number received from the client. Shared between the main task, heartbeat @@ -283,7 +281,6 @@ impl ConnectedUsers { connection: WebSocketConnection, main_task_handle: tokio::task::JoinHandle<()>, heartbeat_task_handle: tokio::task::JoinHandle<()>, - kill_send: tokio::sync::broadcast::Sender<()>, session_token: &str, last_sequence: Arc>, ) -> Arc> { @@ -292,7 +289,6 @@ impl ConnectedUsers { parent: Arc::downgrade(&user), main_task_handle, heartbeat_task_handle, - kill_send, session_token: session_token.to_string(), last_sequence, }; @@ -326,7 +322,7 @@ impl GatewayClient { /// Disconnects a [GatewayClient] properly, including un-registering it from the memory store /// and creating a resumeable session. pub async fn die(mut self, connected_users: ConnectedUsers) { - self.kill_send.send(()).unwrap(); + self.connection.kill_send.send(()).unwrap(); let disconnect_info = DisconnectInfo { session_token: self.session_token.clone(), disconnected_at_sequence: *self.last_sequence.lock().await, From 154006ed4d498049ae4fb133d454540acb8a4231 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 18 Oct 2024 23:44:38 +0200 Subject: [PATCH 02/13] bind to 0.0.0.0 by default --- src/api/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index bcda731..844198e 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -86,7 +86,7 @@ pub async fn start_api( .with(NormalizePath::new(TrailingSlash::Trim)) .catch_all_error(custom_error); - let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); + let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("0.0.0.0:3001")); let bind_clone = bind.clone(); log::info!(target: "symfonia::api", "Starting HTTP Server"); From 9a47b14c9ad6f44bcf13bcaf4c33819d3786482a Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 18 Oct 2024 23:55:25 +0200 Subject: [PATCH 03/13] remove weird ticks from warn messages, add warn message to api_bind env var --- src/api/mod.rs | 7 ++++++- src/database/mod.rs | 10 +++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 844198e..5d66388 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -4,6 +4,8 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +static DEFAULT_API_BIND: &str = "0.0.0.0:3001"; + use poem::{ listener::TcpListener, middleware::{NormalizePath, TrailingSlash}, @@ -86,7 +88,10 @@ pub async fn start_api( .with(NormalizePath::new(TrailingSlash::Trim)) .catch_all_error(custom_error); - let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("0.0.0.0:3001")); + let bind = &std::env::var("API_BIND").unwrap_or_else(|_| { + log::warn!(target: "symfonia::db", "You did not specify API_BIND environment variable. Defaulting to '{DEFAULT_API_BIND}'."); + DEFAULT_API_BIND.to_string() + }); let bind_clone = bind.clone(); log::info!(target: "symfonia::api", "Starting HTTP Server"); diff --git a/src/database/mod.rs b/src/database/mod.rs index c44ee92..e6ce4af 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -22,25 +22,25 @@ static DEFAULT_CONNECTION_PORT: u16 = 5432; pub async fn establish_connection() -> Result { let db_url = std::env::var("DATABASE_HOST").unwrap_or_else(|_| { - log::warn!(target: "symfonia::db", "You did not specify `DATABASE_HOST` environment variable, defaulting to '{DEFAULT_CONNECTION_HOST}'."); + log::warn!(target: "symfonia::db", "You did not specify DATABASE_HOST environment variable, defaulting to '{DEFAULT_CONNECTION_HOST}'."); DEFAULT_CONNECTION_HOST.to_string() }); let connect_options = PgConnectOptions::new() .host(&db_url) .port(std::env::var("DATABASE_PORT").unwrap_or_else(|_| { - log::warn!(target: "symfonia::db", "You did not specify `DATABASE_PORT` environment variable. Defaulting to '{DEFAULT_CONNECTION_PORT}'."); + log::warn!(target: "symfonia::db", "You did not specify DATABASE_PORT environment variable. Defaulting to '{DEFAULT_CONNECTION_PORT}'."); DEFAULT_CONNECTION_PORT.to_string() }).parse::().expect("DATABASE_PORT must be a valid 16 bit unsigned integer.")) .username(&std::env::var("DATABASE_USERNAME").unwrap_or_else(|_| { - log::warn!(target: "symfonia::db", "You did not specify `DATABASE_USERNAME` environment variable. Defaulting to '{DEFAULT_CONNECTION_USERNAME}'."); + log::warn!(target: "symfonia::db", "You did not specify DATABASE_USERNAME environment variable. Defaulting to '{DEFAULT_CONNECTION_USERNAME}'."); DEFAULT_CONNECTION_USERNAME.to_string() })) .password(&std::env::var("DATABASE_PASSWORD").unwrap_or_else(|_| { - log::warn!(target: "symfonia::db", "You did not specify `DATABASE_PASSWORD` environment variable. Defaulting to '{DEFAULT_CONNECTION_PASSWORD}'."); + log::warn!(target: "symfonia::db", "You did not specify DATABASE_PASSWORD environment variable. Defaulting to '{DEFAULT_CONNECTION_PASSWORD}'."); DEFAULT_CONNECTION_PASSWORD.to_string() })) .database(&std::env::var("DATABASE_NAME").unwrap_or_else(|_| { - log::warn!(target: "symfonia::db", "You did not specify `DATABASE_NAME` environment variable. Defaulting to '{DEFAULT_CONNECTION_NAME}'."); + log::warn!(target: "symfonia::db", "You did not specify DATABASE_NAME environment variable. Defaulting to '{DEFAULT_CONNECTION_NAME}'."); DEFAULT_CONNECTION_NAME.to_string() })); let pool = PgPool::connect_with(connect_options).await?; From 45170de1d964894e16b249c148e3e07265d5a280 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 18 Oct 2024 23:58:23 +0200 Subject: [PATCH 04/13] add gateway_bind log message --- src/gateway/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index fd7769e..05681c5 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -5,6 +5,7 @@ */ static RESUME_RECONNECT_WINDOW_SECONDS: u8 = 90; +static DEFAULT_GATEWAY_BIND: &str = "0.0.0.0:3003"; mod establish_connection; mod gateway_task; @@ -102,7 +103,10 @@ pub async fn start_gateway( // TODO(bitfl0wer): Add log messages throughout the method for debugging the gateway info!(target: "symfonia::gateway", "Starting gateway server"); - let bind = std::env::var("GATEWAY_BIND").unwrap_or_else(|_| String::from("localhost:3003")); + let bind = &std::env::var("GATEWAY_BIND").unwrap_or_else(|_| { + log::warn!(target: "symfonia::db", "You did not specify GATEWAY_BIND environment variable. Defaulting to '{DEFAULT_GATEWAY_BIND}'."); + DEFAULT_GATEWAY_BIND.to_string() + }); let try_socket = TcpListener::bind(&bind).await; let listener = try_socket.expect("Failed to bind to address"); From 71aa57413bfada98ee0f388ee899d3c26ec99f1b Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sat, 19 Oct 2024 11:50:57 +0200 Subject: [PATCH 05/13] updated non-docker instructions --- ...5cbb8488020c74eed1bb8e68777dd12d551f9.json | 22 ------------------ ...3a347a745d2ea64ded9c7021c21cb56a2ee86.json | 22 ------------------ ...c7897e45450634c4f2e1a7a4ece72e53f056c.json | 21 ----------------- ...750630ecb9ba50f209b2818c38251720f397b.json | 17 -------------- ...9d0e4ca3b479d01ea7446ac7c120435c52584.json | 23 ------------------- README.md | 2 ++ 6 files changed, 2 insertions(+), 105 deletions(-) delete mode 100644 .sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json delete mode 100644 .sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json delete mode 100644 .sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json delete mode 100644 .sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json delete mode 100644 .sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json diff --git a/.sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json b/.sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json deleted file mode 100644 index f36c175..0000000 --- a/.sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO users (id, username, discriminator, email, data, fingerprints, premium, premium_type, created_at, flags, public_flags, purchased_flags, premium_usage_flags, rights, extended_settings, settings_index) VALUES ($1, $2, $3, $4, $5, $6, false, 0, $7, 0, 0, 0, 0, $8, '{}', $9)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Varchar", - "Varchar", - "Varchar", - "Json", - "Text", - "Timestamp", - "Numeric", - "Numeric" - ] - }, - "nullable": [] - }, - "hash": "00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9" -} diff --git a/.sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json b/.sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json deleted file mode 100644 index 11a74e1..0000000 --- a/.sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO user_settings (locale) VALUES ($1) RETURNING index as inner", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "inner", - "type_info": "Numeric" - } - ], - "parameters": { - "Left": [ - "Varchar" - ] - }, - "nullable": [ - false - ] - }, - "hash": "0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86" -} diff --git a/.sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json b/.sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json deleted file mode 100644 index adf8308..0000000 --- a/.sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO members(\n id, guild_id, deaf, mute, pending, settings, bio, pronouns, joined_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, current_timestamp);", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Numeric", - "Bool", - "Bool", - "Bool", - "Text", - "Varchar", - "Varchar" - ] - }, - "nullable": [] - }, - "hash": "90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c" -} diff --git a/.sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json b/.sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json deleted file mode 100644 index 8dec26d..0000000 --- a/.sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO relationships(from_id, to_id, nickname, type) VALUES($1, $2, $3, $4);", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Numeric", - "Varchar", - "Numeric" - ] - }, - "nullable": [] - }, - "hash": "93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b" -} diff --git a/.sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json b/.sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json deleted file mode 100644 index 2f2373d..0000000 --- a/.sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT guild_id FROM members where id = $1 LIMIT $2", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "guild_id", - "type_info": "Numeric" - } - ], - "parameters": { - "Left": [ - "Numeric", - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584" -} diff --git a/README.md b/README.md index e34073b..7d72e66 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,8 @@ DATABASE_PORT=[Postgres port, usually 5432] DATABASE_USERNAME=[Your Postgres username] DATABASE_PASSWORD=[Your Postgres password] DATABASE_NAME=[Your Postgres database name] +API_BIND=[ip:port to bind the HTTP API server to. Defaults to 0.0.0.0:3001 if not set] +GATEWAY_BIND=[ip:port to bind the Gateway server to. Defaults to 0.0.0.0:3003 if not set] ``` 4. Install the sqlx CLI with `cargo install sqlx-cli` From f326ce0ecc6ceb94d5ef44b7e7bdffe5c4465cbe Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 00:16:51 +0200 Subject: [PATCH 06/13] exit on receive dispatch --- src/gateway/gateway_task.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/gateway/gateway_task.rs b/src/gateway/gateway_task.rs index a2662f7..2a60c6c 100644 --- a/src/gateway/gateway_task.rs +++ b/src/gateway/gateway_task.rs @@ -40,7 +40,13 @@ pub(super) async fn gateway_task( match message_result { Ok(message_of_unknown_type) => { let event = unwrap_event(Event::try_from(message_of_unknown_type), connection.clone(), connection.kill_send.clone()); - // TODO: Handle event + if event.op_code() == Opcode::Dispatch { + // Receiving a dispatch event from a client is never correct + log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected message: {:?}", event); + connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4002), reason: "DECODE_ERROR".into() }))); + connection.kill_send.send(()).expect("Failed to send kill_send"); + panic!("Killing gateway task: Received an unexpected message"); + } }, Err(error) => { connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4000), reason: "INTERNAL_SERVER_ERROR".into() }))); From 0935d454e277722402d7c2ebb94f11c10d300f82 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 13:00:10 +0200 Subject: [PATCH 07/13] add some missing queries --- ...5cbb8488020c74eed1bb8e68777dd12d551f9.json | 22 ++++++++++++++++++ ...3a347a745d2ea64ded9c7021c21cb56a2ee86.json | 22 ++++++++++++++++++ ...c7897e45450634c4f2e1a7a4ece72e53f056c.json | 21 +++++++++++++++++ ...750630ecb9ba50f209b2818c38251720f397b.json | 17 ++++++++++++++ ...9d0e4ca3b479d01ea7446ac7c120435c52584.json | 23 +++++++++++++++++++ 5 files changed, 105 insertions(+) create mode 100644 .sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json create mode 100644 .sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json create mode 100644 .sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json create mode 100644 .sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json create mode 100644 .sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json diff --git a/.sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json b/.sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json new file mode 100644 index 0000000..f36c175 --- /dev/null +++ b/.sqlx/query-00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO users (id, username, discriminator, email, data, fingerprints, premium, premium_type, created_at, flags, public_flags, purchased_flags, premium_usage_flags, rights, extended_settings, settings_index) VALUES ($1, $2, $3, $4, $5, $6, false, 0, $7, 0, 0, 0, 0, $8, '{}', $9)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Varchar", + "Varchar", + "Varchar", + "Json", + "Text", + "Timestamp", + "Numeric", + "Numeric" + ] + }, + "nullable": [] + }, + "hash": "00eb44be0581509631734824a9d5cbb8488020c74eed1bb8e68777dd12d551f9" +} diff --git a/.sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json b/.sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json new file mode 100644 index 0000000..11a74e1 --- /dev/null +++ b/.sqlx/query-0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO user_settings (locale) VALUES ($1) RETURNING index as inner", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "inner", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Varchar" + ] + }, + "nullable": [ + false + ] + }, + "hash": "0d7b2cee7e790ce2f620a2def603a347a745d2ea64ded9c7021c21cb56a2ee86" +} diff --git a/.sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json b/.sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json new file mode 100644 index 0000000..adf8308 --- /dev/null +++ b/.sqlx/query-90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c.json @@ -0,0 +1,21 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO members(\n id, guild_id, deaf, mute, pending, settings, bio, pronouns, joined_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, current_timestamp);", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Numeric", + "Bool", + "Bool", + "Bool", + "Text", + "Varchar", + "Varchar" + ] + }, + "nullable": [] + }, + "hash": "90499a591252a71ff3ed78f2662c7897e45450634c4f2e1a7a4ece72e53f056c" +} diff --git a/.sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json b/.sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json new file mode 100644 index 0000000..8dec26d --- /dev/null +++ b/.sqlx/query-93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO relationships(from_id, to_id, nickname, type) VALUES($1, $2, $3, $4);", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Numeric", + "Varchar", + "Numeric" + ] + }, + "nullable": [] + }, + "hash": "93f9974c9aacce5d679f93ae721750630ecb9ba50f209b2818c38251720f397b" +} diff --git a/.sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json b/.sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json new file mode 100644 index 0000000..2f2373d --- /dev/null +++ b/.sqlx/query-b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT guild_id FROM members where id = $1 LIMIT $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "guild_id", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Numeric", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "b8fccc1abcb63d1442a3136927d9d0e4ca3b479d01ea7446ac7c120435c52584" +} From 714642ec90b04d5e4a8bf8a24198a937c6f1a44b Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 17:52:52 +0200 Subject: [PATCH 08/13] logging time --- src/gateway/gateway_task.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/gateway/gateway_task.rs b/src/gateway/gateway_task.rs index 2a60c6c..6e52278 100644 --- a/src/gateway/gateway_task.rs +++ b/src/gateway/gateway_task.rs @@ -39,6 +39,7 @@ pub(super) async fn gateway_task( message_result = connection.receiver.recv() => { match message_result { Ok(message_of_unknown_type) => { + log::trace!(target: "symfonia::gateway::gateway_task", "Received raw message {:?}", message_of_unknown_type); let event = unwrap_event(Event::try_from(message_of_unknown_type), connection.clone(), connection.kill_send.clone()); if event.op_code() == Opcode::Dispatch { // Receiving a dispatch event from a client is never correct @@ -81,26 +82,26 @@ fn unwrap_event( match e { Error::Gateway(g) => match g { GatewayError::UnexpectedOpcode(o) => { - log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected opcode: {:?}", o); + log::debug!(target: "symfonia::gateway::gateway_task::unwrap_event", "Received an unexpected opcode: {:?}", o); connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4001), reason: "UNKNOWN_OPCODE".into() }))); kill_send.send(()).expect("Failed to send kill_send"); panic!("Killing gateway task: Received an unexpected opcode"); } GatewayError::UnexpectedMessage(m) => { - log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected message: {:?}", m); + log::debug!(target: "symfonia::gateway::gateway_task::unwrap_event", "Received an unexpected message: {:?}", m); connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4002), reason: "DECODE_ERROR".into() }))); kill_send.send(()).expect("Failed to send kill_send"); panic!("Killing gateway task: Received an unexpected message"); } _ => { - log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected error: {:?}", g); + log::debug!(target: "symfonia::gateway::gateway_task::unwrap_event", "Received an unexpected error: {:?}", g); connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4000), reason: "INTERNAL_SERVER_ERROR".into() }))); kill_send.send(()).expect("Failed to send kill_send"); panic!("Killing gateway task: Received an unexpected error"); } }, _ => { - log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected error: {:?}", e); + log::debug!(target: "symfonia::gateway::gateway_task::unwrap_event", "Received an unexpected error: {:?}", e); connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4000), reason: "INTERNAL_SERVER_ERROR".into() }))); kill_send.send(()).expect("Failed to send kill_send"); panic!("Killing gateway task: Received an unexpected error"); From 9e864fb317938f1b9dc325bbc06b723a7103be12 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 17:53:02 +0200 Subject: [PATCH 09/13] try untagged to see if that fixes stuff --- src/gateway/types/event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gateway/types/event.rs b/src/gateway/types/event.rs index 5819042..43133bc 100644 --- a/src/gateway/types/event.rs +++ b/src/gateway/types/event.rs @@ -108,7 +108,7 @@ impl From for Opcode { /// /// The types `T` in `GatewayPayload` might not yet be correct or complete for all events. Please /// feel free to file a PR or an issue should you find any discrepancies. -#[serde(rename_all = "PascalCase")] +#[serde(untagged)] pub enum Event { Hello(GatewayHello), Heartbeat(GatewayHeartbeat), From 24bad18d78ff64961005c89f547a777dfab89fce Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 20:49:48 +0200 Subject: [PATCH 10/13] Fix broken implementation of WebSocketConnection adapter --- src/gateway/types/mod.rs | 56 +++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/src/gateway/types/mod.rs b/src/gateway/types/mod.rs index d2acef3..bf13ce3 100644 --- a/src/gateway/types/mod.rs +++ b/src/gateway/types/mod.rs @@ -481,34 +481,36 @@ impl WebSocketConnection { /// Create a new [WebSocketConnection] from a tungstenite Sink/Stream pair. pub fn new(mut sink: WebSocketSend, mut stream: WebSocketReceive) -> Self { // "100" is an arbitrary limit. Feel free to adjust this, if you have a good reason for it. -bitfl0wer - let (mut sender, mut receiver) = tokio::sync::broadcast::channel(100); - let mut sender_sender_task = sender.clone(); - let mut receiver_sender_task = receiver.resubscribe(); + let (mut websocketsend_sender, mut websocketsend_receiver) = + tokio::sync::broadcast::channel(100); + let (mut websocketreceive_sender, mut websocketreceive_receiver) = + tokio::sync::broadcast::channel(100); + // The sender task concerns itself with sending messages to the WebSocket client. let sender_task = tokio::spawn(async move { log::trace!(target: "symfonia::gateway::types::WebSocketConnection", "spawned sender_task"); loop { let message: Result = - receiver_sender_task.recv().await; + websocketsend_receiver.recv().await; match message { Ok(msg) => { let send_result = sink.send(msg).await; match send_result { Ok(_) => (), - Err(_) => { - sender_sender_task.send(Message::Close(Some(CloseFrame { - code: CloseCode::Error, - reason: "Channel closed or error encountered".into(), - }))); - return; + Err(e) => { + log::debug!(target: "symfonia::gateway::types::WebSocketConnection::sender_task", "Error when sending message to WebSocket: {e}"); + break; } } } - Err(_) => return, + Err(e) => { + log::debug!(target: "symfonia::gateway::types::WebSocketConnection::sender_task", "Error when trying to receive through websocketsend_receiver: {e}"); + break; + } } } }); - let sender_receiver_task = sender.clone(); + // The receiver task receives messages from the WebSocket client and sends them to the // broadcast channel. let receiver_task = tokio::spawn(async move { @@ -517,42 +519,30 @@ impl WebSocketConnection { let web_socket_receive_result = match stream.next().await { Some(res) => res, None => { - log::debug!(target: "symfonia::gateway::WebSocketConnection", "WebSocketReceive yielded None. Sending close message..."); - sender_receiver_task.send(Message::Close(Some(CloseFrame { - code: CloseCode::Error, - reason: "Channel closed or error encountered".into(), - }))); - return; + log::debug!(target: "symfonia::gateway::WebSocketConnection::receiver_task", "WebSocketReceive yielded None. Closing channel"); + break; } }; let web_socket_receive_message = match web_socket_receive_result { Ok(message) => message, Err(e) => { - log::error!(target: "symfonia::gateway::WebSocketConnection", "Received malformed message, closing channel: {e}"); - sender_receiver_task.send(Message::Close(Some(CloseFrame { - code: CloseCode::Error, - reason: "Channel closed or error encountered".into(), - }))); - return; + log::debug!(target: "symfonia::gateway::WebSocketConnection::receiver_task", "Received malformed message, closing channel: {e}"); + break; } }; - match sender_receiver_task.send(web_socket_receive_message) { + match websocketreceive_sender.send(web_socket_receive_message) { Ok(_) => (), Err(e) => { - log::error!(target: "symfonia::gateway::WebSocketConnection", "Unable to send received WebSocket message to channel recipients. Closing channel: {e}"); - sender_receiver_task.send(Message::Close(Some(CloseFrame { - code: CloseCode::Error, - reason: "Channel closed or error encountered".into(), - }))); - return; + log::debug!(target: "symfonia::gateway::WebSocketConnection::receiver_task", "Unable to send received WebSocket message to channel recipients. Closing channel: {e}"); + break; } } } }); let (kill_send, kill_receive) = tokio::sync::broadcast::channel(1); Self { - sender, - receiver, + sender: websocketsend_sender, + receiver: websocketreceive_receiver, sender_task: Arc::new(sender_task), receiver_task: Arc::new(receiver_task), kill_receive, From c1d29485d9bd6ba721fa1d9c7931a93ddbdc819f Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 21:11:45 +0200 Subject: [PATCH 11/13] log event type of received message --- src/gateway/gateway_task.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/gateway/gateway_task.rs b/src/gateway/gateway_task.rs index 6e52278..fa867fc 100644 --- a/src/gateway/gateway_task.rs +++ b/src/gateway/gateway_task.rs @@ -41,6 +41,7 @@ pub(super) async fn gateway_task( Ok(message_of_unknown_type) => { log::trace!(target: "symfonia::gateway::gateway_task", "Received raw message {:?}", message_of_unknown_type); let event = unwrap_event(Event::try_from(message_of_unknown_type), connection.clone(), connection.kill_send.clone()); + log::trace!(target: "symfonia::gateway::gateway_task", "Event type of received message: {:?}", event); if event.op_code() == Opcode::Dispatch { // Receiving a dispatch event from a client is never correct log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected message: {:?}", event); From d80cd806b09a39db3cd47f4df0fd4869a9d78067 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 21:15:17 +0200 Subject: [PATCH 12/13] change raw_gateway_payload type to Option This fixes the server erroring when the first client heartbeat with "null" data is received --- src/gateway/types/event.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gateway/types/event.rs b/src/gateway/types/event.rs index 43133bc..33bd26b 100644 --- a/src/gateway/types/event.rs +++ b/src/gateway/types/event.rs @@ -210,7 +210,9 @@ impl TryFrom for Event { fn try_from(message: tokio_tungstenite::tungstenite::Message) -> Result { /// Takes a message of unknown type as input and tries to convert it to an [Event]. let message_as_string = message.to_string(); - let raw_gateway_payload: GatewayPayload = from_str(&message_as_string)?; + // Payload type of option string is okay, since raw_gateway_payload is only used to look at + // the opcode and, if the opcode is 0 (= dispatch), the event name in the received message + let raw_gateway_payload: GatewayPayload> = from_str(&message_as_string)?; match Opcode::try_from(raw_gateway_payload.op_code).map_err(|_| { Error::Gateway(GatewayError::UnexpectedOpcode( raw_gateway_payload.op_code.into(), From a919632d649eb906b5c13c0ad9943a01d9568ded Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 20 Oct 2024 22:57:54 +0200 Subject: [PATCH 13/13] send heartbeats to heartbeat handler --- src/gateway/gateway_task.rs | 31 +++++++++++++++++++++++++------ src/gateway/heartbeat.rs | 1 - 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/gateway/gateway_task.rs b/src/gateway/gateway_task.rs index fa867fc..b5e4520 100644 --- a/src/gateway/gateway_task.rs +++ b/src/gateway/gateway_task.rs @@ -42,13 +42,32 @@ pub(super) async fn gateway_task( log::trace!(target: "symfonia::gateway::gateway_task", "Received raw message {:?}", message_of_unknown_type); let event = unwrap_event(Event::try_from(message_of_unknown_type), connection.clone(), connection.kill_send.clone()); log::trace!(target: "symfonia::gateway::gateway_task", "Event type of received message: {:?}", event); - if event.op_code() == Opcode::Dispatch { - // Receiving a dispatch event from a client is never correct - log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected message: {:?}", event); - connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4002), reason: "DECODE_ERROR".into() }))); - connection.kill_send.send(()).expect("Failed to send kill_send"); - panic!("Killing gateway task: Received an unexpected message"); + match event { + Event::Dispatch(_) => { + // Receiving a dispatch event from a client is never correct + log::debug!(target: "symfonia::gateway::gateway_task", "Received an unexpected message: {:?}", event); + connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4002), reason: "DECODE_ERROR".into() }))); + connection.kill_send.send(()).expect("Failed to send kill_send"); + panic!("Killing gateway task: Received an unexpected message"); + }, + Event::Heartbeat(hearbeat_event) => { + match heartbeat_send.send(hearbeat_event) { + Err(e) => { + log::debug!(target: "symfonia::gateway::gateway_task", "Received Heartbeat but HeartbeatHandler seems to be dead?"); + connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4002), reason: "DECODE_ERROR".into() }))); + connection.kill_send.send(()).expect("Failed to send kill_send"); + panic!("Killing gateway task: Received an unexpected message"); + }, + Ok(_) => { + log::trace!(target: "symfonia::gateway::gateway_task", "Forwarded heartbeat message to HeartbeatHandler!"); + } + } + } + _ => { + log::error!(target: "symfonia::gateway::gateway_task", "Received an event type for which no code is yet implemented in the gateway_task. Please open a issue or PR at the symfonia repository. {:?}", event); + } } + }, Err(error) => { connection.sender.send(Message::Close(Some(CloseFrame { code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Library(4000), reason: "INTERNAL_SERVER_ERROR".into() }))); diff --git a/src/gateway/heartbeat.rs b/src/gateway/heartbeat.rs index 7164e56..5b792a7 100644 --- a/src/gateway/heartbeat.rs +++ b/src/gateway/heartbeat.rs @@ -134,7 +134,6 @@ impl HeartbeatHandler { } }; self.connection.kill_send.send(()).expect("Failed to send kill signal in heartbeat_handler"); - return; } } }