diff --git a/main/service/webserver.c b/main/service/webserver.c index 4259462..1c3d954 100644 --- a/main/service/webserver.c +++ b/main/service/webserver.c @@ -68,6 +68,7 @@ void start_webserver(void) config.stack_size = 1024 * 8; config.max_uri_handlers = 10; config.task_priority = 12; + config.max_open_sockets = 7; if (httpd_start(&server, &config) != ESP_OK) { diff --git a/main/service/ws.c b/main/service/ws.c index f75b6bc..a1afc99 100644 --- a/main/service/ws.c +++ b/main/service/ws.c @@ -22,9 +22,6 @@ static const char* TAG = "ws-uart"; -static int client_fd = -1; -static SemaphoreHandle_t client_fd_mutex; - enum ws_message_type { WS_MSG_STATUS, @@ -44,8 +41,10 @@ struct bytes_arg size_t len; }; +#define MAX_CLIENT 7 static QueueHandle_t ws_queue; static QueueHandle_t uart_event_queue; +static int client_fds[MAX_CLIENT]; static bool encode_bytes_callback(pb_ostream_t* stream, const pb_field_t* field, void* const* arg) { @@ -61,18 +60,20 @@ static void unified_ws_sender_task(void* arg) { httpd_handle_t server = (httpd_handle_t)arg; struct ws_message msg; - const TickType_t PING_INTERVAL = pdMS_TO_TICKS(5000); while (1) { if (xQueueReceive(ws_queue, &msg, PING_INTERVAL)) { - xSemaphoreTake(client_fd_mutex, portMAX_DELAY); - int fd = client_fd; - - if (fd <= 0) + size_t clients = MAX_CLIENT; + if (httpd_get_client_list(server, &clients, client_fds) != ESP_OK) + { + free(msg.data); + continue; + } + + if (clients == 0) { - xSemaphoreGive(client_fd_mutex); free(msg.data); continue; } @@ -82,38 +83,55 @@ static void unified_ws_sender_task(void* arg) ws_pkt.len = msg.len; ws_pkt.type = HTTPD_WS_TYPE_BINARY; - esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt); - free(msg.data); - - if (err != ESP_OK) + for (size_t i = 0; i < clients; ++i) { - ESP_LOGW(TAG, "unified_ws_sender_task: async send failed for fd %d, error: %s", fd, - esp_err_to_name(err)); - client_fd = -1; + int fd = client_fds[i]; + if (httpd_ws_get_fd_info(server, fd) == HTTPD_WS_CLIENT_WEBSOCKET) + { + esp_err_t err = httpd_ws_send_frame_async(server, fd, &ws_pkt); + if (err != ESP_OK) + { + ESP_LOGW(TAG, "unified_ws_sender_task: async send failed for fd %d, error: %s", fd, + esp_err_to_name(err)); + } + } } - - xSemaphoreGive(client_fd_mutex); + free(msg.data); } else { - xSemaphoreTake(client_fd_mutex, portMAX_DELAY); - int fd = client_fd; - if (fd > 0) + size_t clients = max_clients; + if (httpd_get_client_list(server, &clients, client_fds) != ESP_OK) { - httpd_ws_frame_t ping_pkt = {0}; - ping_pkt.type = HTTPD_WS_TYPE_PING; - ping_pkt.final = true; - esp_err_t err = httpd_ws_send_frame_async(server, fd, &ping_pkt); - if (err != ESP_OK) + continue; + } + + if (clients == 0) + { + continue; + } + + httpd_ws_frame_t ping_pkt = {0}; + ping_pkt.type = HTTPD_WS_TYPE_PING; + ping_pkt.final = true; + + for (size_t i = 0; i < clients; ++i) + { + int fd = client_fds[i]; + if (httpd_ws_get_fd_info(server, fd) == HTTPD_WS_CLIENT_WEBSOCKET) { - ESP_LOGW(TAG, "Failed to send PING frame, closing connection for fd %d, error: %s", fd, - esp_err_to_name(err)); - client_fd = -1; + esp_err_t err = httpd_ws_send_frame_async(server, fd, &ping_pkt); + if (err != ESP_OK) + { + ESP_LOGW(TAG, "Failed to send PING frame, closing connection for fd %d, error: %s", fd, + esp_err_to_name(err)); + } } } - xSemaphoreGive(client_fd_mutex); } } + free(client_fds); + vTaskDelete(NULL); } static void uart_polling_task(void* arg) @@ -123,16 +141,6 @@ static void uart_polling_task(void* arg) while (1) { - xSemaphoreTake(client_fd_mutex, portMAX_DELAY); - int fd = client_fd; - xSemaphoreGive(client_fd_mutex); - - if (fd <= 0) - { - vTaskDelay(pdMS_TO_TICKS(100)); - continue; - } - size_t available_len; uart_get_buffered_data_len(UART_NUM, &available_len); @@ -144,7 +152,6 @@ static void uart_polling_task(void* arg) size_t read_len = (available_len > BUF_SIZE) ? BUF_SIZE : available_len; int bytes_read = uart_read_bytes(UART_NUM, data_buf, read_len, pdMS_TO_TICKS(5)); - printf("-- %d\n", available_len); if (bytes_read > 0) { @@ -155,9 +162,9 @@ static void uart_polling_task(void* arg) StatusMessage message = StatusMessage_init_zero; message.which_payload = StatusMessage_uart_data_tag; - struct bytes_arg arg = {.data = data_buf + offset, .len = chunk_size}; + struct bytes_arg a = {.data = data_buf + offset, .len = chunk_size}; message.payload.uart_data.data.funcs.encode = &encode_bytes_callback; - message.payload.uart_data.data.arg = &arg; + message.payload.uart_data.data.arg = &a; pb_ostream_t stream = pb_ostream_from_buffer(pb_buffer, sizeof(pb_buffer)); if (!pb_encode(&stream, StatusMessage_fields, &message)) @@ -229,22 +236,7 @@ static esp_err_t ws_handler(httpd_req_t* req) { if (req->method == HTTP_GET) { - xSemaphoreTake(client_fd_mutex, portMAX_DELAY); - if (client_fd > 0) - { - ESP_LOGW(TAG, "Another client tried to connect, but a session is already active. Rejecting."); - xSemaphoreGive(client_fd_mutex); - httpd_resp_send_err(req, HTTPD_403_FORBIDDEN, "Another client is already connected"); - return ESP_FAIL; - } - - int new_fd = httpd_req_to_sockfd(req); - ESP_LOGI(TAG, "Accepting new websocket connection: %d", new_fd); - client_fd = new_fd; - xSemaphoreGive(client_fd_mutex); - - xQueueReset(ws_queue); - uart_flush_input(UART_NUM); + ESP_LOGI(TAG, "Handshake done, the new connection was opened"); return ESP_OK; } @@ -257,12 +249,6 @@ static esp_err_t ws_handler(httpd_req_t* req) if (ret != ESP_OK) { ESP_LOGW(TAG, "httpd_ws_recv_frame failed with error: %s", esp_err_to_name(ret)); - xSemaphoreTake(client_fd_mutex, portMAX_DELAY); - if (httpd_req_to_sockfd(req) == client_fd) - { - client_fd = -1; - } - xSemaphoreGive(client_fd_mutex); return ret; } @@ -293,7 +279,6 @@ void register_ws_endpoint(httpd_handle_t server) httpd_uri_t ws = {.uri = "/ws", .method = HTTP_GET, .handler = ws_handler, .user_ctx = NULL, .is_websocket = true}; httpd_register_uri_handler(server, &ws); - client_fd_mutex = xSemaphoreCreateMutex(); ws_queue = xQueueCreate(10, sizeof(struct ws_message)); xTaskCreate(uart_polling_task, "uart_polling_task", 1024 * 4, NULL, 8, NULL); @@ -307,7 +292,8 @@ void push_data_to_ws(const uint8_t* data, size_t len) msg.type = WS_MSG_STATUS; msg.data = malloc(len); if (!msg.data) - { ESP_LOGE(TAG, "Failed to allocate memory for status ws msg"); + { + ESP_LOGE(TAG, "Failed to allocate memory for status ws msg"); return; } memcpy(msg.data, data, len);