From 30c5d2621375aa6b1ff98ed5b0e35693a422f6c7 Mon Sep 17 00:00:00 2001 From: YoungSoo Shin Date: Thu, 28 Aug 2025 12:19:33 +0900 Subject: [PATCH] Websocket optimization Signed-off-by: YoungSoo Shin --- main/service/ws.c | 152 +++++++++++++++++++++++----------------------- 1 file changed, 77 insertions(+), 75 deletions(-) diff --git a/main/service/ws.c b/main/service/ws.c index 019535e..6e63803 100644 --- a/main/service/ws.c +++ b/main/service/ws.c @@ -15,62 +15,92 @@ #define BUF_SIZE (2048) #define UART_TX_PIN CONFIG_GPIO_UART_TX #define UART_RX_PIN CONFIG_GPIO_UART_RX +#define CHUNK_SIZE (1024) static const char *TAG = "ws-uart"; static int client_fd = -1; static SemaphoreHandle_t client_fd_mutex; -struct status_message -{ - cJSON *data; +// Unified message structure for the websocket queue +enum ws_message_type { + WS_MSG_STATUS, + WS_MSG_UART }; -struct uart_to_ws_message +struct ws_message { - uint8_t *data; - size_t len; + enum ws_message_type type; + union { + struct { + cJSON *data; + } status; + struct { + uint8_t *data; + size_t len; + } uart; + } content; }; -QueueHandle_t status_queue; -static QueueHandle_t uart_to_ws_queue; +static QueueHandle_t ws_queue; -// Status task -static void status_task(void *arg) +// Unified task to send data from the queue to the websocket client +static void unified_ws_sender_task(void *arg) { httpd_handle_t server = (httpd_handle_t)arg; - struct status_message msg; + struct ws_message msg; const TickType_t PING_INTERVAL = pdMS_TO_TICKS(5000); while (1) { - if (xQueueReceive(status_queue, &msg, PING_INTERVAL)) { - char *json_string = cJSON_Print(msg.data); - cJSON_Delete(msg.data); - + if (xQueueReceive(ws_queue, &msg, PING_INTERVAL)) { xSemaphoreTake(client_fd_mutex, portMAX_DELAY); int fd = client_fd; - if (fd > 0) { - httpd_ws_frame_t ws_pkt; - memset(&ws_pkt, 0, sizeof(httpd_ws_frame_t)); + + if (fd <= 0) { + xSemaphoreGive(client_fd_mutex); + // Free memory if client is not connected + if (msg.type == WS_MSG_STATUS) { + cJSON_Delete(msg.content.status.data); + } else { + free(msg.content.uart.data); + } + continue; + } + + httpd_ws_frame_t ws_pkt = {0}; + esp_err_t err = ESP_FAIL; + + if (msg.type == WS_MSG_STATUS) { + char *json_string = cJSON_Print(msg.content.status.data); + cJSON_Delete(msg.content.status.data); + ws_pkt.payload = (uint8_t *)json_string; ws_pkt.len = strlen(json_string); ws_pkt.type = HTTPD_WS_TYPE_TEXT; - esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt); + err = httpd_ws_send_frame_async(server, fd, &ws_pkt); + free(json_string); - if (err != ESP_OK) { - ESP_LOGW(TAG, "status_task: async send failed for fd %d, error: %s", fd, esp_err_to_name(err)); - client_fd = -1; - } + } else { // WS_MSG_UART + ws_pkt.payload = msg.content.uart.data; + ws_pkt.len = msg.content.uart.len; + ws_pkt.type = HTTPD_WS_TYPE_BINARY; + err = httpd_ws_send_frame_async(server, fd, &ws_pkt); + free(msg.content.uart.data); } + + if (err != ESP_OK) { + ESP_LOGW(TAG, "unified_ws_sender_task: async send failed for fd %d, error: %s", fd, esp_err_to_name(err)); + client_fd = -1; + } + xSemaphoreGive(client_fd_mutex); - free(json_string); + } else { // Queue receive timed out, send a PING to keep connection alive xSemaphoreTake(client_fd_mutex, portMAX_DELAY); int fd = client_fd; if (fd > 0) { - httpd_ws_frame_t ping_pkt; - memset(&ping_pkt, 0, sizeof(httpd_ws_frame_t)); + httpd_ws_frame_t ping_pkt = {0}; ping_pkt.type = HTTPD_WS_TYPE_PING; ping_pkt.final = true; esp_err_t err = httpd_ws_send_frame_async(server, fd, &ping_pkt); @@ -84,33 +114,6 @@ static void status_task(void *arg) } } -static void ws_sender_task(void *arg) -{ - httpd_handle_t server = (httpd_handle_t)arg; - struct uart_to_ws_message msg; - - while (1) { - if (xQueueReceive(uart_to_ws_queue, &msg, portMAX_DELAY)) { - xSemaphoreTake(client_fd_mutex, portMAX_DELAY); - int fd = client_fd; - if (fd > 0) { - httpd_ws_frame_t ws_pkt = {0}; - ws_pkt.payload = msg.data; - ws_pkt.len = msg.len; - ws_pkt.type = HTTPD_WS_TYPE_BINARY; - - esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt); - if (err != ESP_OK) { - ESP_LOGW(TAG, "ws_sender_task: async send failed for fd %d, error: %s", fd, esp_err_to_name(err)); - client_fd = -1; - } - } - xSemaphoreGive(client_fd_mutex); - free(msg.data); - } - } -} - static void uart_polling_task(void *arg) { static uint8_t data_buf[BUF_SIZE]; @@ -181,27 +184,27 @@ static void uart_polling_task(void *arg) } if (total_processed > 0) { - const size_t CHUNK_SIZE = 1024; size_t offset = 0; while (offset < total_processed) { - size_t chunk_size = (total_processed - offset > CHUNK_SIZE) ? - CHUNK_SIZE : (total_processed - offset); + const size_t chunk_size = (total_processed - offset > CHUNK_SIZE) ? + CHUNK_SIZE : (total_processed - offset); - struct uart_to_ws_message msg; - msg.data = malloc(chunk_size); - if (!msg.data) { + struct ws_message msg; + msg.type = WS_MSG_UART; + msg.content.uart.data = malloc(chunk_size); + if (!msg.content.uart.data) { ESP_LOGE(TAG, "Failed to allocate memory for uart ws msg"); break; } - memcpy(msg.data, data_buf + offset, chunk_size); - msg.len = chunk_size; + memcpy(msg.content.uart.data, data_buf + offset, chunk_size); + msg.content.uart.len = chunk_size; - if (xQueueSend(uart_to_ws_queue, &msg, 0) != pdPASS) { - if (xQueueSend(uart_to_ws_queue, &msg, pdMS_TO_TICKS(5)) != pdPASS) { + if (xQueueSend(ws_queue, &msg, 0) != pdPASS) { + if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(5)) != pdPASS) { ESP_LOGW(TAG, "ws sender queue full, dropping %zu bytes", chunk_size); - free(msg.data); + free(msg.content.uart.data); } } @@ -236,9 +239,8 @@ static esp_err_t ws_handler(httpd_req_t *req) { client_fd = new_fd; xSemaphoreGive(client_fd_mutex); - // Reset queues and flush UART buffer for the new session - xQueueReset(status_queue); - xQueueReset(uart_to_ws_queue); + // Reset queue and flush UART buffer for the new session + xQueueReset(ws_queue); uart_flush_input(UART_NUM); return ESP_OK; } @@ -295,21 +297,21 @@ void register_ws_endpoint(httpd_handle_t server) httpd_register_uri_handler(server, &ws); client_fd_mutex = xSemaphoreCreateMutex(); - status_queue = xQueueCreate(10, sizeof(struct status_message)); - uart_to_ws_queue = xQueueCreate(50, sizeof(struct uart_to_ws_message)); + ws_queue = xQueueCreate(10, sizeof(struct ws_message)); // Combined queue xTaskCreate(uart_polling_task, "uart_polling_task", 1024*4, NULL, 8, NULL); - xTaskCreate(status_task, "status_task", 4096, server, 8, NULL); - xTaskCreate(ws_sender_task, "ws_sender_task", 1024*6, server, 9, NULL); + xTaskCreate(unified_ws_sender_task, "ws_sender_task", 1024*6, server, 9, NULL); } void push_data_to_ws(cJSON *data) { - struct status_message msg; - msg.data = data; - if (xQueueSend(status_queue, &msg, 10) != pdPASS) + struct ws_message msg; + msg.type = WS_MSG_STATUS; + msg.content.status.data = data; + if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(10)) != pdPASS) { - ESP_LOGW(TAG, "Queue full"); + ESP_LOGW(TAG, "WS queue full, dropping status message"); + cJSON_Delete(data); } }