Refactor: Replace single-client WebSocket handling with multi-client support
Signed-off-by: YoungSoo Shin <shinys000114@gmail.com>
This commit is contained in:
@@ -68,6 +68,7 @@ void start_webserver(void)
|
||||
config.stack_size = 1024 * 8;
|
||||
config.max_uri_handlers = 10;
|
||||
config.task_priority = 12;
|
||||
config.max_open_sockets = 7;
|
||||
|
||||
if (httpd_start(&server, &config) != ESP_OK)
|
||||
{
|
||||
|
||||
@@ -22,9 +22,6 @@
|
||||
|
||||
static const char* TAG = "ws-uart";
|
||||
|
||||
static int client_fd = -1;
|
||||
static SemaphoreHandle_t client_fd_mutex;
|
||||
|
||||
enum ws_message_type
|
||||
{
|
||||
WS_MSG_STATUS,
|
||||
@@ -44,8 +41,10 @@ struct bytes_arg
|
||||
size_t len;
|
||||
};
|
||||
|
||||
#define MAX_CLIENT 7
|
||||
static QueueHandle_t ws_queue;
|
||||
static QueueHandle_t uart_event_queue;
|
||||
static int client_fds[MAX_CLIENT];
|
||||
|
||||
static bool encode_bytes_callback(pb_ostream_t* stream, const pb_field_t* field, void* const* arg)
|
||||
{
|
||||
@@ -61,18 +60,20 @@ static void unified_ws_sender_task(void* arg)
|
||||
{
|
||||
httpd_handle_t server = (httpd_handle_t)arg;
|
||||
struct ws_message msg;
|
||||
const TickType_t PING_INTERVAL = pdMS_TO_TICKS(5000);
|
||||
|
||||
while (1)
|
||||
{
|
||||
if (xQueueReceive(ws_queue, &msg, PING_INTERVAL))
|
||||
{
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
int fd = client_fd;
|
||||
|
||||
if (fd <= 0)
|
||||
size_t clients = MAX_CLIENT;
|
||||
if (httpd_get_client_list(server, &clients, client_fds) != ESP_OK)
|
||||
{
|
||||
free(msg.data);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (clients == 0)
|
||||
{
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
free(msg.data);
|
||||
continue;
|
||||
}
|
||||
@@ -82,39 +83,56 @@ static void unified_ws_sender_task(void* arg)
|
||||
ws_pkt.len = msg.len;
|
||||
ws_pkt.type = HTTPD_WS_TYPE_BINARY;
|
||||
|
||||
for (size_t i = 0; i < clients; ++i)
|
||||
{
|
||||
int fd = client_fds[i];
|
||||
if (httpd_ws_get_fd_info(server, fd) == HTTPD_WS_CLIENT_WEBSOCKET)
|
||||
{
|
||||
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt);
|
||||
free(msg.data);
|
||||
|
||||
if (err != ESP_OK)
|
||||
{
|
||||
ESP_LOGW(TAG, "unified_ws_sender_task: async send failed for fd %d, error: %s", fd,
|
||||
esp_err_to_name(err));
|
||||
client_fd = -1;
|
||||
}
|
||||
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
}
|
||||
}
|
||||
free(msg.data);
|
||||
}
|
||||
else
|
||||
{
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
int fd = client_fd;
|
||||
if (fd > 0)
|
||||
size_t clients = max_clients;
|
||||
if (httpd_get_client_list(server, &clients, client_fds) != ESP_OK)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (clients == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
httpd_ws_frame_t ping_pkt = {0};
|
||||
ping_pkt.type = HTTPD_WS_TYPE_PING;
|
||||
ping_pkt.final = true;
|
||||
|
||||
for (size_t i = 0; i < clients; ++i)
|
||||
{
|
||||
int fd = client_fds[i];
|
||||
if (httpd_ws_get_fd_info(server, fd) == HTTPD_WS_CLIENT_WEBSOCKET)
|
||||
{
|
||||
esp_err_t err = httpd_ws_send_frame_async(server, fd, &ping_pkt);
|
||||
if (err != ESP_OK)
|
||||
{
|
||||
ESP_LOGW(TAG, "Failed to send PING frame, closing connection for fd %d, error: %s", fd,
|
||||
esp_err_to_name(err));
|
||||
client_fd = -1;
|
||||
}
|
||||
}
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
}
|
||||
}
|
||||
}
|
||||
free(client_fds);
|
||||
vTaskDelete(NULL);
|
||||
}
|
||||
|
||||
static void uart_polling_task(void* arg)
|
||||
{
|
||||
@@ -123,16 +141,6 @@ static void uart_polling_task(void* arg)
|
||||
|
||||
while (1)
|
||||
{
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
int fd = client_fd;
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
|
||||
if (fd <= 0)
|
||||
{
|
||||
vTaskDelay(pdMS_TO_TICKS(100));
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t available_len;
|
||||
uart_get_buffered_data_len(UART_NUM, &available_len);
|
||||
|
||||
@@ -144,7 +152,6 @@ static void uart_polling_task(void* arg)
|
||||
|
||||
size_t read_len = (available_len > BUF_SIZE) ? BUF_SIZE : available_len;
|
||||
int bytes_read = uart_read_bytes(UART_NUM, data_buf, read_len, pdMS_TO_TICKS(5));
|
||||
printf("-- %d\n", available_len);
|
||||
|
||||
if (bytes_read > 0)
|
||||
{
|
||||
@@ -155,9 +162,9 @@ static void uart_polling_task(void* arg)
|
||||
|
||||
StatusMessage message = StatusMessage_init_zero;
|
||||
message.which_payload = StatusMessage_uart_data_tag;
|
||||
struct bytes_arg arg = {.data = data_buf + offset, .len = chunk_size};
|
||||
struct bytes_arg a = {.data = data_buf + offset, .len = chunk_size};
|
||||
message.payload.uart_data.data.funcs.encode = &encode_bytes_callback;
|
||||
message.payload.uart_data.data.arg = &arg;
|
||||
message.payload.uart_data.data.arg = &a;
|
||||
|
||||
pb_ostream_t stream = pb_ostream_from_buffer(pb_buffer, sizeof(pb_buffer));
|
||||
if (!pb_encode(&stream, StatusMessage_fields, &message))
|
||||
@@ -229,22 +236,7 @@ static esp_err_t ws_handler(httpd_req_t* req)
|
||||
{
|
||||
if (req->method == HTTP_GET)
|
||||
{
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
if (client_fd > 0)
|
||||
{
|
||||
ESP_LOGW(TAG, "Another client tried to connect, but a session is already active. Rejecting.");
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
httpd_resp_send_err(req, HTTPD_403_FORBIDDEN, "Another client is already connected");
|
||||
return ESP_FAIL;
|
||||
}
|
||||
|
||||
int new_fd = httpd_req_to_sockfd(req);
|
||||
ESP_LOGI(TAG, "Accepting new websocket connection: %d", new_fd);
|
||||
client_fd = new_fd;
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
|
||||
xQueueReset(ws_queue);
|
||||
uart_flush_input(UART_NUM);
|
||||
ESP_LOGI(TAG, "Handshake done, the new connection was opened");
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
@@ -257,12 +249,6 @@ static esp_err_t ws_handler(httpd_req_t* req)
|
||||
if (ret != ESP_OK)
|
||||
{
|
||||
ESP_LOGW(TAG, "httpd_ws_recv_frame failed with error: %s", esp_err_to_name(ret));
|
||||
xSemaphoreTake(client_fd_mutex, portMAX_DELAY);
|
||||
if (httpd_req_to_sockfd(req) == client_fd)
|
||||
{
|
||||
client_fd = -1;
|
||||
}
|
||||
xSemaphoreGive(client_fd_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -293,7 +279,6 @@ void register_ws_endpoint(httpd_handle_t server)
|
||||
httpd_uri_t ws = {.uri = "/ws", .method = HTTP_GET, .handler = ws_handler, .user_ctx = NULL, .is_websocket = true};
|
||||
httpd_register_uri_handler(server, &ws);
|
||||
|
||||
client_fd_mutex = xSemaphoreCreateMutex();
|
||||
ws_queue = xQueueCreate(10, sizeof(struct ws_message));
|
||||
|
||||
xTaskCreate(uart_polling_task, "uart_polling_task", 1024 * 4, NULL, 8, NULL);
|
||||
@@ -307,7 +292,8 @@ void push_data_to_ws(const uint8_t* data, size_t len)
|
||||
msg.type = WS_MSG_STATUS;
|
||||
msg.data = malloc(len);
|
||||
if (!msg.data)
|
||||
{ ESP_LOGE(TAG, "Failed to allocate memory for status ws msg");
|
||||
{
|
||||
ESP_LOGE(TAG, "Failed to allocate memory for status ws msg");
|
||||
return;
|
||||
}
|
||||
memcpy(msg.data, data, len);
|
||||
|
||||
Reference in New Issue
Block a user