Websocket optimization
Signed-off-by: YoungSoo Shin <shinys000114@gmail.com>
This commit is contained in:
@@ -15,62 +15,92 @@
|
|||||||
#define BUF_SIZE (2048)
|
#define BUF_SIZE (2048)
|
||||||
#define UART_TX_PIN CONFIG_GPIO_UART_TX
|
#define UART_TX_PIN CONFIG_GPIO_UART_TX
|
||||||
#define UART_RX_PIN CONFIG_GPIO_UART_RX
|
#define UART_RX_PIN CONFIG_GPIO_UART_RX
|
||||||
|
#define CHUNK_SIZE (1024)
|
||||||
|
|
||||||
static const char *TAG = "ws-uart";
|
static const char *TAG = "ws-uart";
|
||||||
|
|
||||||
static int client_fd = -1;
|
static int client_fd = -1;
|
||||||
static SemaphoreHandle_t client_fd_mutex;
|
static SemaphoreHandle_t client_fd_mutex;
|
||||||
|
|
||||||
struct status_message
|
// Unified message structure for the websocket queue
|
||||||
{
|
enum ws_message_type {
|
||||||
cJSON *data;
|
WS_MSG_STATUS,
|
||||||
|
WS_MSG_UART
|
||||||
};
|
};
|
||||||
|
|
||||||
struct uart_to_ws_message
|
struct ws_message
|
||||||
{
|
{
|
||||||
|
enum ws_message_type type;
|
||||||
|
union {
|
||||||
|
struct {
|
||||||
|
cJSON *data;
|
||||||
|
} status;
|
||||||
|
struct {
|
||||||
uint8_t *data;
|
uint8_t *data;
|
||||||
size_t len;
|
size_t len;
|
||||||
|
} uart;
|
||||||
|
} content;
|
||||||
};
|
};
|
||||||
|
|
||||||
QueueHandle_t status_queue;
|
static QueueHandle_t ws_queue;
|
||||||
static QueueHandle_t uart_to_ws_queue;
|
|
||||||
|
|
||||||
// Status task
|
// Unified task to send data from the queue to the websocket client
|
||||||
static void status_task(void *arg)
|
static void unified_ws_sender_task(void *arg)
|
||||||
{
|
{
|
||||||
httpd_handle_t server = (httpd_handle_t)arg;
|
httpd_handle_t server = (httpd_handle_t)arg;
|
||||||
struct status_message msg;
|
struct ws_message msg;
|
||||||
const TickType_t PING_INTERVAL = pdMS_TO_TICKS(5000);
|
const TickType_t PING_INTERVAL = pdMS_TO_TICKS(5000);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (xQueueReceive(status_queue, &msg, PING_INTERVAL)) {
|
if (xQueueReceive(ws_queue, &msg, PING_INTERVAL)) {
|
||||||
char *json_string = cJSON_Print(msg.data);
|
|
||||||
cJSON_Delete(msg.data);
|
|
||||||
|
|
||||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||||
int fd = client_fd;
|
int fd = client_fd;
|
||||||
if (fd > 0) {
|
|
||||||
httpd_ws_frame_t ws_pkt;
|
if (fd <= 0) {
|
||||||
memset(&ws_pkt, 0, sizeof(httpd_ws_frame_t));
|
xSemaphoreGive(client_fd_mutex);
|
||||||
|
// Free memory if client is not connected
|
||||||
|
if (msg.type == WS_MSG_STATUS) {
|
||||||
|
cJSON_Delete(msg.content.status.data);
|
||||||
|
} else {
|
||||||
|
free(msg.content.uart.data);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
httpd_ws_frame_t ws_pkt = {0};
|
||||||
|
esp_err_t err = ESP_FAIL;
|
||||||
|
|
||||||
|
if (msg.type == WS_MSG_STATUS) {
|
||||||
|
char *json_string = cJSON_Print(msg.content.status.data);
|
||||||
|
cJSON_Delete(msg.content.status.data);
|
||||||
|
|
||||||
ws_pkt.payload = (uint8_t *)json_string;
|
ws_pkt.payload = (uint8_t *)json_string;
|
||||||
ws_pkt.len = strlen(json_string);
|
ws_pkt.len = strlen(json_string);
|
||||||
ws_pkt.type = HTTPD_WS_TYPE_TEXT;
|
ws_pkt.type = HTTPD_WS_TYPE_TEXT;
|
||||||
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
||||||
|
free(json_string);
|
||||||
|
|
||||||
|
} else { // WS_MSG_UART
|
||||||
|
ws_pkt.payload = msg.content.uart.data;
|
||||||
|
ws_pkt.len = msg.content.uart.len;
|
||||||
|
ws_pkt.type = HTTPD_WS_TYPE_BINARY;
|
||||||
|
err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
||||||
|
free(msg.content.uart.data);
|
||||||
|
}
|
||||||
|
|
||||||
if (err != ESP_OK) {
|
if (err != ESP_OK) {
|
||||||
ESP_LOGW(TAG, "status_task: async send failed for fd %d, error: %s", fd, esp_err_to_name(err));
|
ESP_LOGW(TAG, "unified_ws_sender_task: async send failed for fd %d, error: %s", fd, esp_err_to_name(err));
|
||||||
client_fd = -1;
|
client_fd = -1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
xSemaphoreGive(client_fd_mutex);
|
xSemaphoreGive(client_fd_mutex);
|
||||||
free(json_string);
|
|
||||||
} else {
|
} else {
|
||||||
// Queue receive timed out, send a PING to keep connection alive
|
// Queue receive timed out, send a PING to keep connection alive
|
||||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||||
int fd = client_fd;
|
int fd = client_fd;
|
||||||
if (fd > 0) {
|
if (fd > 0) {
|
||||||
httpd_ws_frame_t ping_pkt;
|
httpd_ws_frame_t ping_pkt = {0};
|
||||||
memset(&ping_pkt, 0, sizeof(httpd_ws_frame_t));
|
|
||||||
ping_pkt.type = HTTPD_WS_TYPE_PING;
|
ping_pkt.type = HTTPD_WS_TYPE_PING;
|
||||||
ping_pkt.final = true;
|
ping_pkt.final = true;
|
||||||
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ping_pkt);
|
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ping_pkt);
|
||||||
@@ -84,33 +114,6 @@ static void status_task(void *arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ws_sender_task(void *arg)
|
|
||||||
{
|
|
||||||
httpd_handle_t server = (httpd_handle_t)arg;
|
|
||||||
struct uart_to_ws_message msg;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
if (xQueueReceive(uart_to_ws_queue, &msg, portMAX_DELAY)) {
|
|
||||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
|
||||||
int fd = client_fd;
|
|
||||||
if (fd > 0) {
|
|
||||||
httpd_ws_frame_t ws_pkt = {0};
|
|
||||||
ws_pkt.payload = msg.data;
|
|
||||||
ws_pkt.len = msg.len;
|
|
||||||
ws_pkt.type = HTTPD_WS_TYPE_BINARY;
|
|
||||||
|
|
||||||
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
|
||||||
if (err != ESP_OK) {
|
|
||||||
ESP_LOGW(TAG, "ws_sender_task: async send failed for fd %d, error: %s", fd, esp_err_to_name(err));
|
|
||||||
client_fd = -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
xSemaphoreGive(client_fd_mutex);
|
|
||||||
free(msg.data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void uart_polling_task(void *arg)
|
static void uart_polling_task(void *arg)
|
||||||
{
|
{
|
||||||
static uint8_t data_buf[BUF_SIZE];
|
static uint8_t data_buf[BUF_SIZE];
|
||||||
@@ -181,27 +184,27 @@ static void uart_polling_task(void *arg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (total_processed > 0) {
|
if (total_processed > 0) {
|
||||||
const size_t CHUNK_SIZE = 1024;
|
|
||||||
size_t offset = 0;
|
size_t offset = 0;
|
||||||
|
|
||||||
while (offset < total_processed) {
|
while (offset < total_processed) {
|
||||||
size_t chunk_size = (total_processed - offset > CHUNK_SIZE) ?
|
const size_t chunk_size = (total_processed - offset > CHUNK_SIZE) ?
|
||||||
CHUNK_SIZE : (total_processed - offset);
|
CHUNK_SIZE : (total_processed - offset);
|
||||||
|
|
||||||
struct uart_to_ws_message msg;
|
struct ws_message msg;
|
||||||
msg.data = malloc(chunk_size);
|
msg.type = WS_MSG_UART;
|
||||||
if (!msg.data) {
|
msg.content.uart.data = malloc(chunk_size);
|
||||||
|
if (!msg.content.uart.data) {
|
||||||
ESP_LOGE(TAG, "Failed to allocate memory for uart ws msg");
|
ESP_LOGE(TAG, "Failed to allocate memory for uart ws msg");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(msg.data, data_buf + offset, chunk_size);
|
memcpy(msg.content.uart.data, data_buf + offset, chunk_size);
|
||||||
msg.len = chunk_size;
|
msg.content.uart.len = chunk_size;
|
||||||
|
|
||||||
if (xQueueSend(uart_to_ws_queue, &msg, 0) != pdPASS) {
|
if (xQueueSend(ws_queue, &msg, 0) != pdPASS) {
|
||||||
if (xQueueSend(uart_to_ws_queue, &msg, pdMS_TO_TICKS(5)) != pdPASS) {
|
if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(5)) != pdPASS) {
|
||||||
ESP_LOGW(TAG, "ws sender queue full, dropping %zu bytes", chunk_size);
|
ESP_LOGW(TAG, "ws sender queue full, dropping %zu bytes", chunk_size);
|
||||||
free(msg.data);
|
free(msg.content.uart.data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -236,9 +239,8 @@ static esp_err_t ws_handler(httpd_req_t *req) {
|
|||||||
client_fd = new_fd;
|
client_fd = new_fd;
|
||||||
xSemaphoreGive(client_fd_mutex);
|
xSemaphoreGive(client_fd_mutex);
|
||||||
|
|
||||||
// Reset queues and flush UART buffer for the new session
|
// Reset queue and flush UART buffer for the new session
|
||||||
xQueueReset(status_queue);
|
xQueueReset(ws_queue);
|
||||||
xQueueReset(uart_to_ws_queue);
|
|
||||||
uart_flush_input(UART_NUM);
|
uart_flush_input(UART_NUM);
|
||||||
return ESP_OK;
|
return ESP_OK;
|
||||||
}
|
}
|
||||||
@@ -295,21 +297,21 @@ void register_ws_endpoint(httpd_handle_t server)
|
|||||||
httpd_register_uri_handler(server, &ws);
|
httpd_register_uri_handler(server, &ws);
|
||||||
|
|
||||||
client_fd_mutex = xSemaphoreCreateMutex();
|
client_fd_mutex = xSemaphoreCreateMutex();
|
||||||
status_queue = xQueueCreate(10, sizeof(struct status_message));
|
ws_queue = xQueueCreate(10, sizeof(struct ws_message)); // Combined queue
|
||||||
uart_to_ws_queue = xQueueCreate(50, sizeof(struct uart_to_ws_message));
|
|
||||||
|
|
||||||
xTaskCreate(uart_polling_task, "uart_polling_task", 1024*4, NULL, 8, NULL);
|
xTaskCreate(uart_polling_task, "uart_polling_task", 1024*4, NULL, 8, NULL);
|
||||||
xTaskCreate(status_task, "status_task", 4096, server, 8, NULL);
|
xTaskCreate(unified_ws_sender_task, "ws_sender_task", 1024*6, server, 9, NULL);
|
||||||
xTaskCreate(ws_sender_task, "ws_sender_task", 1024*6, server, 9, NULL);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void push_data_to_ws(cJSON *data)
|
void push_data_to_ws(cJSON *data)
|
||||||
{
|
{
|
||||||
struct status_message msg;
|
struct ws_message msg;
|
||||||
msg.data = data;
|
msg.type = WS_MSG_STATUS;
|
||||||
if (xQueueSend(status_queue, &msg, 10) != pdPASS)
|
msg.content.status.data = data;
|
||||||
|
if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(10)) != pdPASS)
|
||||||
{
|
{
|
||||||
ESP_LOGW(TAG, "Queue full");
|
ESP_LOGW(TAG, "WS queue full, dropping status message");
|
||||||
|
cJSON_Delete(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user