Update: websocket optimization
- Use protobuf - Eliminate unnecessary optimization logic - UART, sensor, status data transmitted as pb data Signed-off-by: YoungSoo Shin <shinys000114@gmail.com>
This commit is contained in:
@@ -2,13 +2,15 @@
|
||||
// Created by shinys on 25. 8. 18..
|
||||
//
|
||||
|
||||
#include "cJSON.h"
|
||||
#include "driver/uart.h"
|
||||
#include "esp_err.h"
|
||||
#include "esp_http_server.h"
|
||||
#include "esp_log.h"
|
||||
#include "freertos/semphr.h"
|
||||
#include "nconfig.h"
|
||||
#include "pb.h"
|
||||
#include "pb_encode.h"
|
||||
#include "status.pb.h"
|
||||
#include "webserver.h"
|
||||
|
||||
#define UART_NUM UART_NUM_1
|
||||
@@ -16,13 +18,13 @@
|
||||
#define UART_TX_PIN CONFIG_GPIO_UART_TX
|
||||
#define UART_RX_PIN CONFIG_GPIO_UART_RX
|
||||
#define CHUNK_SIZE (1024)
|
||||
#define PB_UART_BUFFER_SIZE (CHUNK_SIZE + 64)
|
||||
|
||||
static const char* TAG = "ws-uart";
|
||||
|
||||
static int client_fd = -1;
|
||||
static SemaphoreHandle_t client_fd_mutex;
|
||||
|
||||
// Unified message structure for the websocket queue
|
||||
enum ws_message_type
|
||||
{
|
||||
WS_MSG_STATUS,
|
||||
@@ -32,25 +34,28 @@ enum ws_message_type
|
||||
struct ws_message
|
||||
{
|
||||
enum ws_message_type type;
|
||||
uint8_t* data;
|
||||
size_t len;
|
||||
};
|
||||
|
||||
union
|
||||
{
|
||||
struct
|
||||
{
|
||||
cJSON* data;
|
||||
} status;
|
||||
|
||||
struct
|
||||
{
|
||||
uint8_t* data;
|
||||
size_t len;
|
||||
} uart;
|
||||
} content;
|
||||
struct bytes_arg
|
||||
{
|
||||
const void* data;
|
||||
size_t len;
|
||||
};
|
||||
|
||||
static QueueHandle_t ws_queue;
|
||||
|
||||
// Unified task to send data from the queue to the websocket client
|
||||
static bool encode_bytes_callback(pb_ostream_t* stream, const pb_field_t* field, void* const* arg)
|
||||
{
|
||||
struct bytes_arg* br = (struct bytes_arg*)(*arg);
|
||||
if (!pb_encode_tag_for_field(stream, field))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return pb_encode_string(stream, (uint8_t*)br->data, br->len);
|
||||
}
|
||||
|
||||
static void unified_ws_sender_task(void* arg)
|
||||
{
|
||||
httpd_handle_t server = (httpd_handle_t)arg;
|
||||
@@ -67,41 +72,17 @@ static void unified_ws_sender_task(void* arg)
|
||||
if (fd <= 0)
|
||||
{
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
// Free memory if client is not connected
|
||||
if (msg.type == WS_MSG_STATUS)
|
||||
{
|
||||
cJSON_Delete(msg.content.status.data);
|
||||
}
|
||||
else
|
||||
{
|
||||
free(msg.content.uart.data);
|
||||
}
|
||||
free(msg.data);
|
||||
continue;
|
||||
}
|
||||
|
||||
httpd_ws_frame_t ws_pkt = {0};
|
||||
esp_err_t err = ESP_FAIL;
|
||||
ws_pkt.payload = msg.data;
|
||||
ws_pkt.len = msg.len;
|
||||
ws_pkt.type = HTTPD_WS_TYPE_BINARY;
|
||||
|
||||
if (msg.type == WS_MSG_STATUS)
|
||||
{
|
||||
char* json_string = cJSON_Print(msg.content.status.data);
|
||||
cJSON_Delete(msg.content.status.data);
|
||||
|
||||
ws_pkt.payload = (uint8_t*)json_string;
|
||||
ws_pkt.len = strlen(json_string);
|
||||
ws_pkt.type = HTTPD_WS_TYPE_TEXT;
|
||||
err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
||||
free(json_string);
|
||||
}
|
||||
else
|
||||
{
|
||||
// WS_MSG_UART
|
||||
ws_pkt.payload = msg.content.uart.data;
|
||||
ws_pkt.len = msg.content.uart.len;
|
||||
ws_pkt.type = HTTPD_WS_TYPE_BINARY;
|
||||
err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
||||
free(msg.content.uart.data);
|
||||
}
|
||||
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
||||
free(msg.data);
|
||||
|
||||
if (err != ESP_OK)
|
||||
{
|
||||
@@ -114,7 +95,6 @@ static void unified_ws_sender_task(void* arg)
|
||||
}
|
||||
else
|
||||
{
|
||||
// Queue receive timed out, send a PING to keep connection alive
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
int fd = client_fd;
|
||||
if (fd > 0)
|
||||
@@ -138,125 +118,77 @@ static void unified_ws_sender_task(void* arg)
|
||||
static void uart_polling_task(void* arg)
|
||||
{
|
||||
static uint8_t data_buf[BUF_SIZE];
|
||||
const TickType_t MIN_POLLING_INTERVAL = pdMS_TO_TICKS(1);
|
||||
const TickType_t MAX_POLLING_INTERVAL = pdMS_TO_TICKS(10);
|
||||
const TickType_t READ_TIMEOUT = pdMS_TO_TICKS(5);
|
||||
|
||||
TickType_t current_interval = MIN_POLLING_INTERVAL;
|
||||
int consecutive_empty_polls = 0;
|
||||
int cached_client_fd = -1;
|
||||
TickType_t last_client_check = 0;
|
||||
const TickType_t CLIENT_CHECK_INTERVAL = pdMS_TO_TICKS(100);
|
||||
static uint8_t pb_buffer[PB_UART_BUFFER_SIZE];
|
||||
|
||||
while (1)
|
||||
{
|
||||
TickType_t current_time = xTaskGetTickCount();
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
int fd = client_fd;
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
|
||||
if (current_time - last_client_check >= CLIENT_CHECK_INTERVAL)
|
||||
if (fd <= 0)
|
||||
{
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
cached_client_fd = client_fd;
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
last_client_check = current_time;
|
||||
vTaskDelay(pdMS_TO_TICKS(100));
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t available_len;
|
||||
esp_err_t err = uart_get_buffered_data_len(UART_NUM, &available_len);
|
||||
uart_get_buffered_data_len(UART_NUM, &available_len);
|
||||
|
||||
if (err != ESP_OK || available_len == 0)
|
||||
if (available_len == 0)
|
||||
{
|
||||
consecutive_empty_polls++;
|
||||
if (consecutive_empty_polls > 5)
|
||||
{
|
||||
current_interval = MAX_POLLING_INTERVAL;
|
||||
}
|
||||
else if (consecutive_empty_polls > 2)
|
||||
{
|
||||
current_interval = pdMS_TO_TICKS(5);
|
||||
}
|
||||
|
||||
if (cached_client_fd <= 0)
|
||||
{
|
||||
vTaskDelay(pdMS_TO_TICKS(50));
|
||||
continue;
|
||||
}
|
||||
|
||||
vTaskDelay(current_interval);
|
||||
vTaskDelay(pdMS_TO_TICKS(10));
|
||||
continue;
|
||||
}
|
||||
|
||||
consecutive_empty_polls = 0;
|
||||
current_interval = MIN_POLLING_INTERVAL;
|
||||
size_t read_len = (available_len > BUF_SIZE) ? BUF_SIZE : available_len;
|
||||
int bytes_read = uart_read_bytes(UART_NUM, data_buf, read_len, pdMS_TO_TICKS(5));
|
||||
|
||||
if (cached_client_fd <= 0)
|
||||
{
|
||||
uart_flush_input(UART_NUM);
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t total_processed = 0;
|
||||
while (available_len > 0 && total_processed < BUF_SIZE)
|
||||
{
|
||||
size_t read_size =
|
||||
(available_len > (BUF_SIZE - total_processed)) ? (BUF_SIZE - total_processed) : available_len;
|
||||
|
||||
int bytes_read = uart_read_bytes(UART_NUM, data_buf + total_processed, read_size, READ_TIMEOUT);
|
||||
|
||||
if (bytes_read <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
total_processed += bytes_read;
|
||||
available_len -= bytes_read;
|
||||
|
||||
uart_get_buffered_data_len(UART_NUM, &available_len);
|
||||
}
|
||||
|
||||
if (total_processed > 0)
|
||||
if (bytes_read > 0)
|
||||
{
|
||||
size_t offset = 0;
|
||||
|
||||
while (offset < total_processed)
|
||||
while (offset < bytes_read)
|
||||
{
|
||||
const size_t chunk_size =
|
||||
(total_processed - offset > CHUNK_SIZE) ? CHUNK_SIZE : (total_processed - offset);
|
||||
size_t chunk_size = (bytes_read - offset > CHUNK_SIZE) ? CHUNK_SIZE : (bytes_read - offset);
|
||||
|
||||
StatusMessage message = StatusMessage_init_zero;
|
||||
message.which_payload = StatusMessage_uart_data_tag;
|
||||
struct bytes_arg arg = {.data = data_buf + offset, .len = chunk_size};
|
||||
message.payload.uart_data.data.funcs.encode = &encode_bytes_callback;
|
||||
message.payload.uart_data.data.arg = &arg;
|
||||
|
||||
pb_ostream_t stream = pb_ostream_from_buffer(pb_buffer, sizeof(pb_buffer));
|
||||
if (!pb_encode(&stream, StatusMessage_fields, &message))
|
||||
{
|
||||
ESP_LOGE(TAG, "Failed to encode uart data: %s", PB_GET_ERROR(&stream));
|
||||
offset += chunk_size;
|
||||
continue;
|
||||
}
|
||||
|
||||
struct ws_message msg;
|
||||
msg.type = WS_MSG_UART;
|
||||
msg.content.uart.data = malloc(chunk_size);
|
||||
if (!msg.content.uart.data)
|
||||
msg.len = stream.bytes_written;
|
||||
msg.data = malloc(msg.len);
|
||||
|
||||
if (!msg.data)
|
||||
{
|
||||
ESP_LOGE(TAG, "Failed to allocate memory for uart ws msg");
|
||||
break;
|
||||
offset += chunk_size;
|
||||
continue;
|
||||
}
|
||||
|
||||
memcpy(msg.content.uart.data, data_buf + offset, chunk_size);
|
||||
msg.content.uart.len = chunk_size;
|
||||
memcpy(msg.data, pb_buffer, msg.len);
|
||||
|
||||
if (xQueueSend(ws_queue, &msg, 0) != pdPASS)
|
||||
if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(10)) != pdPASS)
|
||||
{
|
||||
if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(5)) != pdPASS)
|
||||
{
|
||||
ESP_LOGW(TAG, "ws sender queue full, dropping %zu bytes", chunk_size);
|
||||
free(msg.content.uart.data);
|
||||
}
|
||||
ESP_LOGW(TAG, "ws sender queue full, dropping %zu bytes", chunk_size);
|
||||
free(msg.data);
|
||||
}
|
||||
|
||||
offset += chunk_size;
|
||||
}
|
||||
}
|
||||
|
||||
if (available_len > 0)
|
||||
{
|
||||
vTaskDelay(MIN_POLLING_INTERVAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
vTaskDelay(current_interval);
|
||||
}
|
||||
}
|
||||
|
||||
vTaskDelete(NULL);
|
||||
}
|
||||
|
||||
@@ -267,20 +199,17 @@ static esp_err_t ws_handler(httpd_req_t* req)
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
if (client_fd > 0)
|
||||
{
|
||||
// A client is already connected. Reject the new connection.
|
||||
ESP_LOGW(TAG, "Another client tried to connect, but a session is already active. Rejecting.");
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
httpd_resp_send_err(req, HTTPD_403_FORBIDDEN, "Another client is already connected");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
// No client is connected. Accept the new one.
|
||||
int new_fd = httpd_req_to_sockfd(req);
|
||||
ESP_LOGI(TAG, "Accepting new websocket connection: %d", new_fd);
|
||||
client_fd = new_fd;
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
|
||||
// Reset queue and flush UART buffer for the new session
|
||||
xQueueReset(ws_queue);
|
||||
uart_flush_input(UART_NUM);
|
||||
return ESP_OK;
|
||||
@@ -312,7 +241,6 @@ static esp_err_t ws_handler(httpd_req_t* req)
|
||||
void register_ws_endpoint(httpd_handle_t server)
|
||||
{
|
||||
size_t baud_rate_len;
|
||||
|
||||
nconfig_get_str_len(UART_BAUD_RATE, &baud_rate_len);
|
||||
char buf[baud_rate_len];
|
||||
nconfig_read(UART_BAUD_RATE, buf, baud_rate_len);
|
||||
@@ -323,32 +251,38 @@ void register_ws_endpoint(httpd_handle_t server)
|
||||
.parity = UART_PARITY_DISABLE,
|
||||
.stop_bits = UART_STOP_BITS_1,
|
||||
.flow_ctrl = UART_HW_FLOWCTRL_DISABLE,
|
||||
// .source_clk = UART_SCLK_APB,
|
||||
};
|
||||
|
||||
ESP_ERROR_CHECK(uart_param_config(UART_NUM, &uart_config));
|
||||
ESP_ERROR_CHECK(uart_set_pin(UART_NUM, UART_TX_PIN, UART_RX_PIN, UART_PIN_NO_CHANGE, UART_PIN_NO_CHANGE));
|
||||
ESP_ERROR_CHECK(uart_driver_install(UART_NUM, BUF_SIZE * 2, BUF_SIZE * 2, 0, NULL, 0));
|
||||
ESP_ERROR_CHECK(uart_driver_install(UART_NUM, BUF_SIZE * 2, BUF_SIZE * 2, 20, NULL, 0));
|
||||
|
||||
httpd_uri_t ws = {.uri = "/ws", .method = HTTP_GET, .handler = ws_handler, .user_ctx = NULL, .is_websocket = true};
|
||||
httpd_register_uri_handler(server, &ws);
|
||||
|
||||
client_fd_mutex = xSemaphoreCreateMutex();
|
||||
ws_queue = xQueueCreate(10, sizeof(struct ws_message)); // Combined queue
|
||||
ws_queue = xQueueCreate(10, sizeof(struct ws_message));
|
||||
|
||||
xTaskCreate(uart_polling_task, "uart_polling_task", 1024 * 4, NULL, 8, NULL);
|
||||
xTaskCreate(unified_ws_sender_task, "ws_sender_task", 1024 * 6, server, 9, NULL);
|
||||
}
|
||||
|
||||
void push_data_to_ws(cJSON* data)
|
||||
void push_data_to_ws(const uint8_t* data, size_t len)
|
||||
{
|
||||
struct ws_message msg;
|
||||
msg.type = WS_MSG_STATUS;
|
||||
msg.content.status.data = data;
|
||||
msg.data = malloc(len);
|
||||
if (!msg.data)
|
||||
{ ESP_LOGE(TAG, "Failed to allocate memory for status ws msg");
|
||||
return;
|
||||
}
|
||||
memcpy(msg.data, data, len);
|
||||
msg.len = len;
|
||||
|
||||
if (xQueueSend(ws_queue, &msg, pdMS_TO_TICKS(10)) != pdPASS)
|
||||
{
|
||||
ESP_LOGW(TAG, "WS queue full, dropping status message");
|
||||
cJSON_Delete(data);
|
||||
free(msg.data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user