Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/WebSocket/WebSocket.ino
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ void loop() {
}

if (now - lastHeap >= 2000) {
// cleanup disconnected clients or too many clients
ws.cleanupClients();
Serial.printf("Connected clients: %u / %u total\n", ws.count(), ws.getClients().size());

// this can be called to also set a soft limit on the number of connected clients
ws.cleanupClients(2); // no more than 2 clients

#ifdef ESP32
Serial.printf("Free heap: %" PRIu32 "\n", ESP.getFreeHeap());
Expand Down
43 changes: 35 additions & 8 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async
AsyncWebSocketClient::~AsyncWebSocketClient() {
{
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
_messageQueue.clear();
_controlQueue.clear();
Expand All @@ -351,7 +351,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
_lastMessageTime = millis();

#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif

if (!_controlQueue.empty()) {
Expand All @@ -362,6 +362,14 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
_controlQueue.pop_front();
_status = WS_DISCONNECTED;
if (_client) {
#ifdef ESP32
/*
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
Due to _client->close(true) shall call the callback function _onDisconnect()
The calling flow _onDisconnect() --> _handleDisconnect() --> ~AsyncWebSocketClient()
*/
lock.unlock();
#endif
_client->close(true);
}
return;
Expand All @@ -385,7 +393,7 @@ void AsyncWebSocketClient::_onPoll() {
}

#ifdef ESP32
std::unique_lock<std::mutex> lock(_lock);
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif
if (_client && _client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty())) {
_runQueue();
Expand Down Expand Up @@ -415,21 +423,21 @@ void AsyncWebSocketClient::_runQueue() {

bool AsyncWebSocketClient::queueIsFull() const {
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED);
}

size_t AsyncWebSocketClient::queueLen() const {
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return _messageQueue.size();
}

bool AsyncWebSocketClient::canSend() const {
#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return _messageQueue.size() < WS_MAX_QUEUED_MESSAGES;
}
Expand All @@ -440,7 +448,7 @@ bool AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
}

#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif

_controlQueue.emplace_back(opcode, data, len, mask);
Expand All @@ -458,14 +466,22 @@ bool AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint
}

#ifdef ESP32
std::lock_guard<std::mutex> lock(_lock);
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif

if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) {
if (closeWhenFull) {
_status = WS_DISCONNECTED;

if (_client) {
#ifdef ESP32
/*
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
Due to _client->close(true) shall call the callback function _onDisconnect()
The calling flow _onDisconnect() --> _handleDisconnect() --> ~AsyncWebSocketClient()
*/
lock.unlock();
#endif
_client->close(true);
}

Expand Down Expand Up @@ -551,6 +567,7 @@ void AsyncWebSocketClient::_onTimeout(uint32_t time) {
void AsyncWebSocketClient::_onDisconnect() {
// Serial.println("onDis");
_client = nullptr;
_server->_handleDisconnect(this);
}

void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
Expand Down Expand Up @@ -857,6 +874,16 @@ AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request)
return &_clients.back();
}

void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) {
const auto client_id = client->id();
const auto iter = std::find_if(std::begin(_clients), std::end(_clients), [client_id](const AsyncWebSocketClient &c) {
return c.id() == client_id;
});
if (iter != std::end(_clients)) {
_clients.erase(iter);
}
}

bool AsyncWebSocket::availableForWriteAll() {
return std::none_of(std::begin(_clients), std::end(_clients), [](const AsyncWebSocketClient &c) {
return c.queueIsFull();
Expand Down
3 changes: 2 additions & 1 deletion src/AsyncWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class AsyncWebSocketClient {
uint32_t _clientId;
AwsClientStatus _status;
#ifdef ESP32
mutable std::mutex _lock;
mutable std::recursive_mutex _lock;
#endif
std::deque<AsyncWebSocketControl> _controlQueue;
std::deque<AsyncWebSocketMessage> _messageQueue;
Expand Down Expand Up @@ -385,6 +385,7 @@ class AsyncWebSocket : public AsyncWebHandler {
return _cNextId++;
}
AsyncWebSocketClient *_newClient(AsyncWebServerRequest *request);
void _handleDisconnect(AsyncWebSocketClient *client);
void _handleEvent(AsyncWebSocketClient *client, AwsEventType type, void *arg, uint8_t *data, size_t len);
bool canHandle(AsyncWebServerRequest *request) const override final;
void handleRequest(AsyncWebServerRequest *request) override final;
Expand Down