From 5bdfac6496088b415b8fa8b6882add290a43b1d7 Mon Sep 17 00:00:00 2001 From: Osman Hadzic Date: Sun, 17 Mar 2024 22:46:21 +0100 Subject: [PATCH 1/3] Update MQTTnet library Update MQTTnet library from 3.1.2 to version 4.1.4.563. Resolving braking changes with namespaces. Replace `UseApplicationMessageReceivedHandler` and `UseDisconnectedHandler` handlers with events `ApplicationMessageReceivedAsync` and `DisconnectedAsync`. Implement enum `MqttQualityOfServiceLevel` to replace the int value Qos. Signed-off-by: Osman Hadzic --- .../AstarteDeviceSDKCSharp.csproj | 2 +- .../Crypto/AstarteCryptoStore.cs | 6 ++-- .../AstarteMutualTLSParametersFactory.cs | 13 ++++++--- .../Crypto/IAstarteCryptoStore.cs | 2 +- .../Transport/MQTT/AstarteMqttTransport.cs | 28 +++++++++++++------ .../Transport/MQTT/AstarteMqttV1Transport.cs | 27 ++++++++++-------- .../Transport/MQTT/IMqttConnectionInfo.cs | 4 +-- ...tualSSLAuthenticationMqttConnectionInfo.cs | 7 ++--- 8 files changed, 54 insertions(+), 35 deletions(-) diff --git a/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj b/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj index 47e1e70..9074191 100644 --- a/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj +++ b/AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj @@ -34,7 +34,7 @@ SPDX-License-Identifier: Apache-2.0 --> all - + diff --git a/AstarteDeviceSDKCSharp/Crypto/AstarteCryptoStore.cs b/AstarteDeviceSDKCSharp/Crypto/AstarteCryptoStore.cs index 51af932..42da34d 100644 --- a/AstarteDeviceSDKCSharp/Crypto/AstarteCryptoStore.cs +++ b/AstarteDeviceSDKCSharp/Crypto/AstarteCryptoStore.cs @@ -18,7 +18,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -using MQTTnet.Client.Options; +using MQTTnet.Client; using System.Security.Cryptography; using System.Security.Cryptography.X509Certificates; using System.Text; @@ -55,13 +55,13 @@ private void LoadNewCertificate() } - public void SaveCertificateIfNotExist(X509Certificate2 x509Certificate) + public void SaveCertificateIfNotExist(X509Certificate2 x509Certificate2) { //Save certificate to file try { File.WriteAllText(Path.Combine(_cryptoStoreDir, "device.crt"), - new(PemEncoding.Write("CERTIFICATE", x509Certificate.GetRawCertData()).ToArray())); + new(PemEncoding.Write("CERTIFICATE", x509Certificate2.GetRawCertData()).ToArray())); } catch (DirectoryNotFoundException ex) { diff --git a/AstarteDeviceSDKCSharp/Crypto/AstarteMutualTLSParametersFactory.cs b/AstarteDeviceSDKCSharp/Crypto/AstarteMutualTLSParametersFactory.cs index 7aef1a9..1914cda 100644 --- a/AstarteDeviceSDKCSharp/Crypto/AstarteMutualTLSParametersFactory.cs +++ b/AstarteDeviceSDKCSharp/Crypto/AstarteMutualTLSParametersFactory.cs @@ -18,12 +18,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -using MQTTnet.Client.Options; +using MQTTnet.Client; using System.Security.Cryptography.X509Certificates; namespace AstarteDeviceSDKCSharp.Crypto { - public class AstarteMutualTLSParametersFactory : MqttClientOptionsBuilderTlsParameters + public class AstarteMutualTLSParametersFactory { private readonly MqttClientOptionsBuilderTlsParameters _tlsOptions; @@ -33,14 +33,19 @@ public AstarteMutualTLSParametersFactory(IAstarteCryptoStore cryptoStore) : base _tlsOptions = new MqttClientOptionsBuilderTlsParameters { UseTls = true, - Certificates = new List + Certificates = new List { cryptoStore.GetCertificate() }, IgnoreCertificateChainErrors = cryptoStore.IgnoreSSLErrors, IgnoreCertificateRevocationErrors = cryptoStore.IgnoreSSLErrors, SslProtocol = System.Security.Authentication.SslProtocols.Tls12, - AllowUntrustedCertificates = cryptoStore.IgnoreSSLErrors + AllowUntrustedCertificates = cryptoStore.IgnoreSSLErrors, + CertificateValidationHandler = eventArgs => + { + eventArgs.Certificate = cryptoStore.GetCertificate(); + return true; + } }; } diff --git a/AstarteDeviceSDKCSharp/Crypto/IAstarteCryptoStore.cs b/AstarteDeviceSDKCSharp/Crypto/IAstarteCryptoStore.cs index 0037232..9b8592c 100644 --- a/AstarteDeviceSDKCSharp/Crypto/IAstarteCryptoStore.cs +++ b/AstarteDeviceSDKCSharp/Crypto/IAstarteCryptoStore.cs @@ -18,7 +18,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -using MQTTnet.Client.Options; +using MQTTnet.Client; using System.Security.Cryptography.X509Certificates; namespace AstarteDeviceSDKCSharp.Crypto diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs index 81cbaa4..169fd39 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs @@ -26,12 +26,10 @@ using AstarteDeviceSDKCSharp.Utilities; using MQTTnet; using MQTTnet.Client; -using MQTTnet.Client.Connecting; -using MQTTnet.Client.Disconnecting; -using MQTTnet.Client.Publishing; using MQTTnet.Exceptions; using System.Diagnostics; using System.IO.Compression; +using System.Runtime.CompilerServices; using System.Text; namespace AstarteDeviceSDKCSharp.Transport.MQTT @@ -94,8 +92,17 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent) Trace.WriteLine("Transport Connected"); } - _client.UseApplicationMessageReceivedHandler(OnMessageReceive); - _client.UseDisconnectedHandler(OnDisconnectAsync); + if (_client is not null) + { + _client.ApplicationMessageReceivedAsync += e => + { + OnMessageReceive(e); + return Task.CompletedTask; + }; + + _client.DisconnectedAsync += OnDisconnectAsync; + + } } @@ -114,8 +121,8 @@ public override async Task Connect() _client = await InitClientAsync(); } - var result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(), - CancellationToken.None); + MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(), + CancellationToken.None).ConfigureAwait(false); if (result.ResultCode == MqttClientConnectResultCode.Success) { @@ -192,6 +199,11 @@ private async Task SendEmptyCacheAsync() .WithRetainFlag(false) .Build(); + if (_client is null) + { + return; + } + MqttClientPublishResult result = await _client.PublishAsync(applicationMessage); if (result.ReasonCode != MqttClientPublishReasonCode.Success) { @@ -200,7 +212,7 @@ private async Task SendEmptyCacheAsync() } } - private Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e) + Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e) { if (_astarteTransportEventListener != null) { diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index 8556dd6..ecb0d37 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -26,8 +26,8 @@ using AstarteDeviceSDKCSharp.Utilities; using MQTTnet; using MQTTnet.Client; -using MQTTnet.Client.Publishing; using MQTTnet.Exceptions; +using MQTTnet.Protocol; using System.Text; using static AstarteDeviceSDKCSharp.Protocol.AstarteInterfaceDatastreamMapping; @@ -69,7 +69,7 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface try { - await DoSendMqttMessage(topic, payload, qos); + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); } catch (MqttCommunicationException ex) { @@ -85,7 +85,7 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface } - private async Task DoSendMqttMessage(string topic, byte[] payload, int qos) + private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qos) { var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) @@ -96,13 +96,16 @@ private async Task DoSendMqttMessage(string topic, byte[] payload, int qos) try { + if (_client is not null) + { + MqttClientPublishResult result = await _client.PublishAsync(applicationMessage); - MqttClientPublishResult result = await _client.PublishAsync(applicationMessage); + if (result.ReasonCode != MqttClientPublishReasonCode.Success) + { + throw new AstarteTransportException + ($"Error publishing on MQTT. Code: {result.ReasonCode}"); + } - if (result.ReasonCode != MqttClientPublishReasonCode.Success) - { - throw new AstarteTransportException - ($"Error publishing on MQTT. Code: {result.ReasonCode}"); } } @@ -140,7 +143,7 @@ public override async Task SendIntrospection() .Remove(introspectionStringBuilder.Length - 1, 1); string introspection = introspectionStringBuilder.ToString(); - await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), 2); + await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2); } public override async Task SendIndividualValue(AstarteInterface astarteInterface, @@ -167,7 +170,7 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast try { - await DoSendMqttMessage(topic, payload, qos); + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); } catch (MqttCommunicationException e) { @@ -251,7 +254,7 @@ public override void RetryFailedMessages() private async Task DoSendMessage(IAstarteFailedMessage failedMessage) { await DoSendMqttMessage(failedMessage.GetTopic(), failedMessage.GetPayload(), - failedMessage.GetQos()); + (MqttQualityOfServiceLevel)failedMessage.GetQos()); } private int QosFromReliability(AstarteInterfaceDatastreamMapping mapping) @@ -322,7 +325,7 @@ private async void DoSendMqttMessage(IAstarteFailedMessage failedMessage) { await DoSendMqttMessage(failedMessage.GetTopic(), failedMessage.GetPayload(), - failedMessage.GetQos()); + (MqttQualityOfServiceLevel)failedMessage.GetQos()); } private void HandleDatastreamFailedPublish(MqttCommunicationException e, diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs index 52bc0cf..57bc3a5 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs @@ -18,7 +18,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -using MQTTnet.Client.Options; +using MQTTnet.Client; namespace AstarteDeviceSDKCSharp.Transport.MQTT @@ -29,6 +29,6 @@ public interface IMqttConnectionInfo string GetClientId(); - IMqttClientOptions GetMqttConnectOptions(); + MqttClientOptions GetMqttConnectOptions(); } } diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs index e224bfe..61f6b08 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs @@ -18,7 +18,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -using MQTTnet.Client.Options; +using MQTTnet.Client; namespace AstarteDeviceSDKCSharp.Transport.MQTT { @@ -26,7 +26,7 @@ public class MutualSSLAuthenticationMqttConnectionInfo : IMqttConnectionInfo { private readonly Uri _brokerUrl; - private readonly IMqttClientOptions _mqttConnectOptions; + private readonly MqttClientOptions _mqttConnectOptions; private readonly string _clientId = string.Empty; public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRealm, @@ -38,7 +38,6 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe .WithTcpServer(_brokerUrl.Host, _brokerUrl.Port) .WithTls(tlsOptions) .WithCleanSession(false) - .WithCommunicationTimeout(TimeSpan.FromSeconds(60)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) .WithSessionExpiryInterval(0) .Build(); @@ -50,6 +49,6 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe public string GetClientId() => _clientId; - public IMqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions; + public MqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions; } } From 78a269434c27ffd82449376862844d33a25014a5 Mon Sep 17 00:00:00 2001 From: Osman Hadzic Date: Mon, 18 Mar 2024 08:49:01 +0100 Subject: [PATCH 2/3] Refactor strategy for fallback messages Handling exceptions for deadlock threads. Implement `TryPingAsync` for check is broker available for sending messages. Add timeout time to device constructor, timeOut duration for the connection. Implement new field in constructors E2E and Example. Add timeout for HTTP request on Pairing API. Signed-off-by: Osman Hadzic --- .../AstartePairingServiceTest.cs | 14 ++++----- .../AstartePairingHandler.cs | 8 +++-- .../AstartePairingService.cs | 14 +++++---- .../Device/AstarteDevice.cs | 20 +++++++++++-- .../Transport/AstarteTransportFactory.cs | 5 ++-- .../Transport/MQTT/AstarteMqttTransport.cs | 28 ++++++++++++----- .../Transport/MQTT/AstarteMqttV1Transport.cs | 30 +++++++++++++++---- .../Transport/MQTT/IMqttConnectionInfo.cs | 2 ++ ...tualSSLAuthenticationMqttConnectionInfo.cs | 7 ++++- .../Utilities/AstarteDeviceSingleton.cs | 1 + AstarteDeviceSDKExample/Program.cs | 9 ++++-- 11 files changed, 103 insertions(+), 35 deletions(-) diff --git a/AstarteDeviceSDKCSharp.Tests/AstartePairingServiceTest.cs b/AstarteDeviceSDKCSharp.Tests/AstartePairingServiceTest.cs index f482eae..1085779 100644 --- a/AstarteDeviceSDKCSharp.Tests/AstartePairingServiceTest.cs +++ b/AstarteDeviceSDKCSharp.Tests/AstartePairingServiceTest.cs @@ -49,7 +49,7 @@ public async Task TestThrowExceptionOnUnsuccessfulRegisterDeviceWithJwtTokenAsyn .WithHeader("Content-Type", "application/json; charset=utf-8") ); - _service = new AstartePairingService($"{_server.Urls[0]}", "test"); + _service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500)); //Assert _ = await Assert.ThrowsAsync(() => //Act @@ -75,7 +75,7 @@ public async Task TestThrowExceptionOnEmptyResponseDeviceWithJwtTokenAsync() + "}") ); ; - _service = new AstartePairingService($"{_server.Urls[0]}", "test"); + _service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500)); //Assert _ = await Assert.ThrowsAsync(() => @@ -116,7 +116,7 @@ public async Task TestSuccessfullRegisterDeviceWithJwtTokenAsync() .WithHeader("Content-Type", "application/json; charset=utf-8") .WithBody(responseBody) ); - _service = new AstartePairingService($"{_server.Urls[0]}", "test"); + _service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500)); //Act string credentialSecret = await _service.RegisterDeviceWithJwtToken(deviceId, expectedCredentialSecret); @@ -151,7 +151,7 @@ public async Task TestThrowExceptionOnUnsuccessfulRegisterDeviceWithPrivateKeyAs .WithHeader("Content-Type", "application/json; charset=utf-8") ); - _service = new AstartePairingService($"{_server.Urls[0]}", "test"); + _service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500)); //Assert _ = await Assert.ThrowsAsync(() => //Act @@ -198,7 +198,7 @@ public async Task TestSuccessfullRegisterDeviceWithPrivateKeyAsync() "JdorKpZMgsh6rzLQ=="); string privateKey = new(PemEncoding.Write("EC PRIVATE KEY", array).ToArray()); - _service = new AstartePairingService($"{_server.Urls[0]}", "test"); + _service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500)); var myPath = Path.GetDirectoryName( System.Reflection.Assembly.GetExecutingAssembly().Location); myPath = Path.Combine(myPath, "key.pem"); @@ -242,7 +242,7 @@ public void TestPairingServiceUrlConstructor_Local() Uri _expectedPairingUrl = new Uri("http://localhost:4003"); //Act - _service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test"); + _service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test", TimeSpan.FromMilliseconds(500)); Uri _actualPairingUrl = _service.PairingUrl(); @@ -260,7 +260,7 @@ public void TestPairingServiceUrlConstructor_Custom() Uri _expectedPairingUrl = new Uri("https://astarte.instance.test/pairing"); //Act - _service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test"); + _service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test", TimeSpan.FromMilliseconds(500)); Uri _actualPairingUrl = _service.PairingUrl(); diff --git a/AstarteDeviceSDKCSharp/AstartePairingHandler.cs b/AstarteDeviceSDKCSharp/AstartePairingHandler.cs index 3e973c1..9f4b1ce 100644 --- a/AstarteDeviceSDKCSharp/AstartePairingHandler.cs +++ b/AstarteDeviceSDKCSharp/AstartePairingHandler.cs @@ -33,15 +33,17 @@ public class AstartePairingHandler readonly AstarteCryptoStore _cryptoStore; private List? _transports; private X509Certificate2? _certificate; + private TimeSpan _timeOut; public AstartePairingHandler(string pairingUrl, string astarteRealm, string deviceId, - string credentialSecret, AstarteCryptoStore astarteCryptoStore) + string credentialSecret, AstarteCryptoStore astarteCryptoStore, TimeSpan timeout) { _astarteRealm = astarteRealm; _deviceId = deviceId; _credentialSecret = credentialSecret; _cryptoStore = astarteCryptoStore; - _AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm); + _timeOut = timeout; + _AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm, timeout); _certificate = _cryptoStore.GetCertificate(); if (_certificate == null) @@ -60,7 +62,7 @@ public async Task Init() private async Task ReloadTransports() { _transports = await _AstartePairingService.ReloadTransports(_credentialSecret, - _cryptoStore, _deviceId); + _cryptoStore, _deviceId, _timeOut); } public List GetTransports() diff --git a/AstarteDeviceSDKCSharp/AstartePairingService.cs b/AstarteDeviceSDKCSharp/AstartePairingService.cs index ab07f10..81fb9ee 100644 --- a/AstarteDeviceSDKCSharp/AstartePairingService.cs +++ b/AstarteDeviceSDKCSharp/AstartePairingService.cs @@ -39,16 +39,18 @@ public class AstartePairingService private readonly string _astarteRealm; private readonly HttpClient _httpClient; - public AstartePairingService(string pairingUrl, string astarteRealm) + public AstartePairingService(string pairingUrl, string astarteRealm, TimeSpan timeOut) { _astarteRealm = astarteRealm; _pairingUrl = new Uri($"{pairingUrl.TrimEnd('/')}/v1"); - _httpClient = new HttpClient(); - + _httpClient = new HttpClient + { + Timeout = timeOut + }; } internal async Task> ReloadTransports(string credentialSecret, - AstarteCryptoStore astarteCryptoStore, string deviceId) + AstarteCryptoStore astarteCryptoStore, string deviceId, TimeSpan timeOut) { List transports = new(); // Prepare the Pairing API request @@ -101,7 +103,8 @@ internal async Task> ReloadTransports(string credentialSe _astarteRealm, deviceId, item, - astarteCryptoStore); + astarteCryptoStore, + timeOut); transports.Add(supportedTransport); } @@ -135,6 +138,7 @@ public async Task RequestNewCertificate _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", credentialSecret); + try { diff --git a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs index 524fa94..43bd2d3 100644 --- a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs +++ b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs @@ -68,6 +68,7 @@ public class AstarteDevice : IAstarteTransportEventListener /// It can be a shared directory for multiple devices, a subdirectory for the given device /// ID will be created. /// + /// The timeout duration for the connection. public AstarteDevice( string deviceId, string astarteRealm, @@ -75,6 +76,7 @@ public AstarteDevice( IAstarteInterfaceProvider astarteInterfaceProvider, string pairingBaseUrl, string cryptoStoreDirectory, + TimeSpan? timeOut, bool ignoreSSLErrors = false) { if (!Directory.Exists(cryptoStoreDirectory)) @@ -91,12 +93,15 @@ public AstarteDevice( AstarteCryptoStore astarteCryptoStore = new AstarteCryptoStore(fullCryptoDirPath); astarteCryptoStore.IgnoreSSLErrors = ignoreSSLErrors; + + _pairingHandler = new AstartePairingHandler( pairingBaseUrl, astarteRealm, deviceId, credentialSecret, - astarteCryptoStore); + astarteCryptoStore, + (TimeSpan)(timeOut is null ? TimeSpan.FromSeconds(5) : timeOut)); astartePropertyStorage = new AstartePropertyStorage(fullCryptoDirPath); @@ -180,7 +185,18 @@ private bool EventualyReconnect() { interval = MAX_INCREMENT_INTERVAL; } - Task.Run(async () => await Connect()).Wait(interval); + + try + { + Task.Run(async () => await Connect()).Wait(interval); + } + catch (AggregateException ex) + { + foreach (var innerException in ex.InnerExceptions) + { + Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}"); + } + } } } diff --git a/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs b/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs index b3fb457..b2511e7 100644 --- a/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs +++ b/AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs @@ -29,7 +29,7 @@ internal class AstarteTransportFactory public static AstarteTransport? CreateAstarteTransportFromPairing (AstarteProtocolType protocolType, string astarteRealm, - string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore) + string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore, TimeSpan timeOut) { switch (protocolType) @@ -40,7 +40,8 @@ public static AstarteTransport? CreateAstarteTransportFromPairing new MutualSSLAuthenticationMqttConnectionInfo(brokerUrl, astarteRealm, deviceId, - astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters()) + astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters(), + timeOut) ); default: return null; diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs index 169fd39..3c78368 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs @@ -121,18 +121,30 @@ public override async Task Connect() _client = await InitClientAsync(); } - MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(), - CancellationToken.None).ConfigureAwait(false); - - if (result.ResultCode == MqttClientConnectResultCode.Success) + try { - await CompleteAstarteConnection(result.IsSessionPresent); + using (var timeoutToken = new CancellationTokenSource(_connectionInfo.GetTimeOut())) + { + MqttClientConnectResult result = await _client.ConnectAsync( + _connectionInfo.GetMqttConnectOptions(), + timeoutToken.Token); + + if (result.ResultCode == MqttClientConnectResultCode.Success) + { + await CompleteAstarteConnection(result.IsSessionPresent); + } + else + { + throw new AstarteTransportException + ($"Error connecting to MQTT. Code: {result.ResultCode}"); + } + } } - else + catch (OperationCanceledException) { - throw new AstarteTransportException - ($"Error connecting to MQTT. Code: {result.ResultCode}"); + Trace.WriteLine("Timeout while connecting."); } + } public override void Disconnect() diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index ecb0d37..3bbffdf 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -69,7 +69,17 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface try { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + if (_client.TryPingAsync().Result) + { + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + } + else + { + HandleDatastreamFailedPublish( + new MqttCommunicationException("Broker is not available."), + mapping, topic, payload, qos); + } + } catch (MqttCommunicationException ex) { @@ -143,7 +153,7 @@ public override async Task SendIntrospection() .Remove(introspectionStringBuilder.Length - 1, 1); string introspection = introspectionStringBuilder.ToString(); - await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2); + await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), MqttQualityOfServiceLevel.ExactlyOnce); } public override async Task SendIndividualValue(AstarteInterface astarteInterface, @@ -170,11 +180,21 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast try { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + if (_client.TryPingAsync().Result) + { + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + } + else + { + HandleDatastreamFailedPublish( + new MqttCommunicationException("Broker is not available."), + mapping, topic, payload, qos); + } + } - catch (MqttCommunicationException e) + catch (MqttCommunicationException ex) { - HandleDatastreamFailedPublish(e, mapping, topic, payload, qos); + HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos); _astarteTransportEventListener?.OnTransportDisconnected(); } catch (AstarteTransportException ex) diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs index 57bc3a5..768d4bf 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs @@ -30,5 +30,7 @@ public interface IMqttConnectionInfo string GetClientId(); MqttClientOptions GetMqttConnectOptions(); + + TimeSpan GetTimeOut(); } } diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs index 61f6b08..d16eca0 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/MutualSSLAuthenticationMqttConnectionInfo.cs @@ -28,9 +28,10 @@ public class MutualSSLAuthenticationMqttConnectionInfo : IMqttConnectionInfo private readonly Uri _brokerUrl; private readonly MqttClientOptions _mqttConnectOptions; private readonly string _clientId = string.Empty; + private readonly TimeSpan _timeOut; public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRealm, - string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions) + string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions, TimeSpan timeOut) { _brokerUrl = brokerUrl; _mqttConnectOptions = new MqttClientOptionsBuilder() @@ -40,8 +41,10 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe .WithCleanSession(false) .WithKeepAlivePeriod(TimeSpan.FromSeconds(60)) .WithSessionExpiryInterval(0) + .WithTimeout(timeOut) .Build(); + _timeOut = timeOut; _clientId = $"{astarteRealm}/{deviceId}"; } @@ -49,6 +52,8 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe public string GetClientId() => _clientId; + public TimeSpan GetTimeOut() => _timeOut; + public MqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions; } } diff --git a/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs b/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs index 9d2017d..9360fad 100644 --- a/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs +++ b/AstarteDeviceSDKCSharpE2E.Tests/Utilities/AstarteDeviceSingleton.cs @@ -62,6 +62,7 @@ public static AstarteDevice Instance new MockInterfaceProvider(), astarteMockData.PairingUrl, cryptoStoreDir, + TimeSpan.FromMilliseconds(500), true ); astarteDevice.SetAlwaysReconnect(true); diff --git a/AstarteDeviceSDKExample/Program.cs b/AstarteDeviceSDKExample/Program.cs index 3502a83..70a947c 100644 --- a/AstarteDeviceSDKExample/Program.cs +++ b/AstarteDeviceSDKExample/Program.cs @@ -71,7 +71,10 @@ static async Task Main(string[] args) Guid nameSpace = Guid.NewGuid(); string macAdress = "0099112233"; deviceId = AstarteDeviceIdUtils.GenerateId(nameSpace, macAdress); - credentialsSecret = await new AstartePairingService(pairingUrl, realm) + credentialsSecret = await new AstartePairingService( + pairingUrl, + realm, + TimeSpan.FromSeconds(1)) .RegisterDeviceWithJwtToken(deviceId, jwt); } @@ -93,7 +96,9 @@ static async Task Main(string[] args) credentialsSecret, interfaceProvider, pairingUrl, - cryptoStoreDir); + cryptoStoreDir, + TimeSpan.FromMilliseconds(500), + true); myDevice.SetAlwaysReconnect(true); myDevice.SetAstarteMessageListener(new ExampleMessageListener()); From b719ea0925b9ac4e00cd8bf60921537ac1abefa8 Mon Sep 17 00:00:00 2001 From: Osman Hadzic Date: Fri, 22 Mar 2024 13:34:43 +0100 Subject: [PATCH 3/3] Update CHANGELOG.md Signed-off-by: Osman Hadzic --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a1885d..596488d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [0.7.0] - Unreleased ### Added +- Add timeout to the `AstartePairingService` constructor. +- Add device connection timeout to the `ÀstarteDevice` constructor. +- Update version of MQTTNet libary from 3.1.2 to 4.1.4.563. - Add a fallout strategy for individual failed messages. - Resend failed messages stored in the cache memory.