diff --git a/UtilityClient/Native/WebsocketClient.cs b/UtilityClient/Native/WebsocketClient.cs index e4ad5a3..16a41f5 100644 --- a/UtilityClient/Native/WebsocketClient.cs +++ b/UtilityClient/Native/WebsocketClient.cs @@ -1,17 +1,18 @@ using System.Net.WebSockets; using System.Text; +using System.Threading; namespace GoXLRUtilityClient.Native; public class WebsocketClient : IDisposable { private ClientWebSocket _client = new(); - private CancellationTokenSource _cancellationTokenSource = new(); private Task? _receiveMessageTask; private Task? _connectionTask; - private bool _isConnected; + private readonly SemaphoreSlim _connectionLock = new(1, 1); // For thread safety + private readonly SemaphoreSlim _sendLock = new(1, 1); // For thread safety on send public event EventHandler? OnConnected; public event EventHandler? OnDisconnected; @@ -21,163 +22,263 @@ public class WebsocketClient : IDisposable private async Task ReceiveMessageTask() { var memoryStream = new MemoryStream(); - var hasReceivedCloseMessage = false; var buffer = new byte[1024]; // Buffer to store received data - while (!this._cancellationTokenSource.IsCancellationRequested) + + try { - if (this._client.State != WebSocketState.Open) + while (!_cancellationTokenSource.Token.IsCancellationRequested) { - await Task.Delay(20); - continue; - } + if (_client.State != WebSocketState.Open) + { + await Task.Delay(20, _cancellationTokenSource.Token); + continue; + } - WebSocketReceiveResult? result = null; - try - { - result = await _client.ReceiveAsync(new ArraySegment(buffer), - this._cancellationTokenSource.Token); - } - catch (TaskCanceledException) - { - /* do nothing */ - } - catch (Exception e) - { - this.OnError?.Invoke(this, e); - } - - if (result is null) continue; - switch (result.MessageType) - { - case WebSocketMessageType.Text: - if (result.EndOfMessage && memoryStream.Position == 0) - { - var message = Encoding.UTF8.GetString(buffer, 0, result.Count); - this.OnMessage?.Invoke(this, message); - } - else - { - await memoryStream.WriteAsync(buffer, 0, result.Count); - if (result.EndOfMessage) + WebSocketReceiveResult? result = null; + try + { + result = await _client.ReceiveAsync(new ArraySegment(buffer), _cancellationTokenSource.Token); + } + catch (TaskCanceledException) + { + break; // Task was canceled, exit gracefully + } + catch (Exception e) + { + OnError?.Invoke(this, e); + break; + } + + if (result is null) continue; + switch (result.MessageType) + { + case WebSocketMessageType.Text: + if (result.EndOfMessage && memoryStream.Position == 0) { - var message = Encoding.UTF8.GetString(memoryStream.GetBuffer(), 0, (int)memoryStream.Position); - this.OnMessage?.Invoke(this, message); - memoryStream.SetLength(0); + var message = Encoding.UTF8.GetString(buffer, 0, result.Count); + OnMessage?.Invoke(this, message); } - } - break; - - case WebSocketMessageType.Close: - if (hasReceivedCloseMessage) break; - hasReceivedCloseMessage = true; - - // If the server initiates the close handshake, handle it - await Task.Run(DisconnectAsync); - break; - - default: - // This won't occur with the Utility, but we need to trigger DisconnectAsync to perform tidying up! - await _client.CloseAsync(WebSocketCloseStatus.ProtocolError, "Only Text is supported.", CancellationToken.None); - this.OnDisconnected?.Invoke(this, "Connection closed because server tried to send binary or invalid message."); - break; + else + { + await memoryStream.WriteAsync(buffer, 0, result.Count); + if (result.EndOfMessage) + { + var message = Encoding.UTF8.GetString(memoryStream.GetBuffer(), 0, (int)memoryStream.Position); + OnMessage?.Invoke(this, message); + memoryStream.SetLength(0); + } + } + break; + + case WebSocketMessageType.Close: + await HandleServerInitiatedClose(); + return; + + default: + await _client.CloseAsync(WebSocketCloseStatus.ProtocolError, "Only Text is supported.", CancellationToken.None); + OnDisconnected?.Invoke(this, "Connection closed because server tried to send binary or invalid message."); + return; + } } } + catch (OperationCanceledException) when (_cancellationTokenSource.Token.IsCancellationRequested) + { + // Ignore cancellation exceptions + } + catch (Exception ex) + { + OnError?.Invoke(this, ex); + } + finally + { + memoryStream.Dispose(); // Ensure memory stream is disposed + } + } + + private async Task HandleServerInitiatedClose() + { + if (_client.State == WebSocketState.CloseReceived) + { + await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closing on server request", CancellationToken.None); + await DisconnectAsync(); // Ensure proper cleanup + } } private async Task ConnectionTask() { - while (!this._cancellationTokenSource.IsCancellationRequested) + try { - switch (this._client.State) + while (!_cancellationTokenSource.Token.IsCancellationRequested) { - case WebSocketState.Open: - if (!this._isConnected) this.OnConnected?.Invoke(this, "Connected."); - this._isConnected = true; - break; - - case WebSocketState.Aborted: - case WebSocketState.Closed: - // Trigger an internal disconnect to clean resources. - await Task.Run(DisconnectAsync); - break; - } + switch (_client.State) + { + case WebSocketState.Open: + if (!_isConnected) + { + OnConnected?.Invoke(this, "Connected."); + _isConnected = true; + } + break; - await Task.Delay(200); + case WebSocketState.Aborted: + case WebSocketState.Closed: + await DisconnectAsync(); + return; // Exit the loop and stop reconnection attempts + } + + await Task.Delay(200, _cancellationTokenSource.Token); + } + } + catch (OperationCanceledException) when (_cancellationTokenSource.Token.IsCancellationRequested) + { + // Ignore cancellation exceptions + } + catch (Exception ex) + { + OnError?.Invoke(this, ex); } } public bool IsConnectionAlive() { - return this._client.State == WebSocketState.Open; + return _client.State == WebSocketState.Open; } public async Task ConnectAsync(string uri) { - return await this.ConnectAsync(new Uri(uri)); + return await ConnectAsync(new Uri(uri)); } protected async Task ConnectAsync(Uri uri) - { - this._receiveMessageTask = ReceiveMessageTask(); - this._connectionTask = ConnectionTask(); - await this._client.ConnectAsync(uri, this._cancellationTokenSource.Token); - return this._client.State == WebSocketState.Open; + { + await _connectionLock.WaitAsync(); // Ensure only one connection at a time + try + { + if (_client.State == WebSocketState.Open) return true; // Already connected + + _cancellationTokenSource = new CancellationTokenSource(); // Reset cancellation token + + _receiveMessageTask = ReceiveMessageTask(); + _connectionTask = ConnectionTask(); + + await _client.ConnectAsync(uri, _cancellationTokenSource.Token); + + return _client.State == WebSocketState.Open; + } + catch (Exception ex) + { + OnError?.Invoke(this, ex); + return false; + } + finally + { + _connectionLock.Release(); // Release the lock + } } protected async Task DisconnectAsync() { - // Only attempt to close the socket if it's not already closed - if (this._client.State != WebSocketState.Aborted && this._client.State != WebSocketState.Closed) { - await this._client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); - } - this.OnDisconnected?.Invoke(this, "Connection closed."); - - await _cancellationTokenSource.CancelAsync(); - var shutdownSuccessful = Task.WaitAll(new[] { this._receiveMessageTask!, this._connectionTask! }, - TimeSpan.FromSeconds(5)); - if (!shutdownSuccessful) - { - this.OnError?.Invoke(this, new Exception("Failed to dispose tasks.")); - return; - } - - this._receiveMessageTask!.Dispose(); - this._connectionTask!.Dispose(); - - // Dispose of, and create a new client / Token for future connections - this._client.Dispose(); - this._client = new(); - this._cancellationTokenSource.Dispose(); - this._cancellationTokenSource = new(); - - // Flag the connection as disconnected - this._isConnected = false; + await _connectionLock.WaitAsync(); // Ensure thread safety for disconnection + try + { + if (_client.State != WebSocketState.Aborted && _client.State != WebSocketState.Closed) + { + try + { + await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "Disconnecting", CancellationToken.None); + } + catch (Exception ex) + { + OnError?.Invoke(this, ex); + } + } + + OnDisconnected?.Invoke(this, "Connection closed."); + + _cancellationTokenSource.Cancel(); // Cancel any ongoing operations + + await Task.WhenAny(Task.WhenAll(_receiveMessageTask ?? Task.CompletedTask, _connectionTask ?? Task.CompletedTask), Task.Delay(5000)); + + DisposeTasks(); + + // Dispose of WebSocket client and recreate it for next use + _client.Dispose(); + _client = new ClientWebSocket(); + + // Dispose and recreate the cancellation token source + _cancellationTokenSource.Dispose(); + _cancellationTokenSource = new CancellationTokenSource(); + + _isConnected = false; + } + catch (Exception ex) + { + OnError?.Invoke(this, new Exception("An error occurred during disconnection.", ex)); + } + finally + { + _connectionLock.Release(); // Release the lock + } + } + + private void DisposeTasks() + { + try + { + _receiveMessageTask?.Dispose(); + _receiveMessageTask = null; + + _connectionTask?.Dispose(); + _connectionTask = null; + } + catch (Exception ex) + { + OnError?.Invoke(this, new Exception("An error occurred while disposing tasks.", ex)); + } } protected async Task SendMessage(string message) { - await this._client.SendAsync( - new ArraySegment(Encoding.UTF8.GetBytes(message)), - WebSocketMessageType.Text, - true, - this._cancellationTokenSource.Token); + await _sendLock.WaitAsync(); // Ensure only one send operation at a time + try + { + await _client.SendAsync( + new ArraySegment(Encoding.UTF8.GetBytes(message)), + WebSocketMessageType.Text, + true, + _cancellationTokenSource.Token); + } + catch (Exception ex) + { + OnError?.Invoke(this, ex); + } + finally + { + _sendLock.Release(); // Release the lock + } } - + public void Dispose() { - this._cancellationTokenSource.Cancel(); + if (!_cancellationTokenSource.IsCancellationRequested) + { + _cancellationTokenSource.Cancel(); + } - var shutdownSuccessful = Task.WaitAll(new[] { this._receiveMessageTask!, this._connectionTask! }, - TimeSpan.FromSeconds(5)); - if (!shutdownSuccessful) + try { - this.OnError?.Invoke(this, new Exception("Failed to dispose tasks.")); - return; + Task.WaitAll(new[] { _receiveMessageTask ?? Task.CompletedTask, _connectionTask ?? Task.CompletedTask }, TimeSpan.FromSeconds(5)); } - - this._cancellationTokenSource.Dispose(); - this._client.Dispose(); - this._receiveMessageTask!.Dispose(); - this._connectionTask!.Dispose(); + catch (Exception ex) + { + OnError?.Invoke(this, new Exception("Failed to dispose tasks.", ex)); + } + + _cancellationTokenSource.Dispose(); + _client.Dispose(); + _receiveMessageTask?.Dispose(); + _connectionTask?.Dispose(); + _connectionLock.Dispose(); // Dispose of the semaphore + _sendLock.Dispose(); // Dispose of the semaphore } -} \ No newline at end of file +}