Skip to content

Commit

Permalink
Update MQTTnet library
Browse files Browse the repository at this point in the history
Update MQTTnet library from 3.1.2 to version
4.1.4.563. Resolving braking changes with
namespaces.
Replace `UseApplicationMessageReceivedHandler` and
`UseDisconnectedHandler` handlers with events
`ApplicationMessageReceivedAsync` and
`DisconnectedAsync`.
Implement enum `MqttQualityOfServiceLevel` to
replace the int value Qos.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Mar 17, 2024
1 parent 57cab2c commit 5bdfac6
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 35 deletions.
2 changes: 1 addition & 1 deletion AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ SPDX-License-Identifier: Apache-2.0 -->
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="JWT" Version="9.0.3" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="MQTTnet" Version="4.1.4.563" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="Newtonsoft.Json.Bson" Version="1.0.2" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.25.1" />
Expand Down
6 changes: 3 additions & 3 deletions AstarteDeviceSDKCSharp/Crypto/AstarteCryptoStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
Expand Down Expand Up @@ -55,13 +55,13 @@ private void LoadNewCertificate()

}

public void SaveCertificateIfNotExist(X509Certificate2 x509Certificate)
public void SaveCertificateIfNotExist(X509Certificate2 x509Certificate2)
{
//Save certificate to file
try
{
File.WriteAllText(Path.Combine(_cryptoStoreDir, "device.crt"),
new(PemEncoding.Write("CERTIFICATE", x509Certificate.GetRawCertData()).ToArray()));
new(PemEncoding.Write("CERTIFICATE", x509Certificate2.GetRawCertData()).ToArray()));
}
catch (DirectoryNotFoundException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;
using System.Security.Cryptography.X509Certificates;

namespace AstarteDeviceSDKCSharp.Crypto
{
public class AstarteMutualTLSParametersFactory : MqttClientOptionsBuilderTlsParameters
public class AstarteMutualTLSParametersFactory
{

private readonly MqttClientOptionsBuilderTlsParameters _tlsOptions;
Expand All @@ -33,14 +33,19 @@ public AstarteMutualTLSParametersFactory(IAstarteCryptoStore cryptoStore) : base
_tlsOptions = new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List<X509Certificate?>
Certificates = new List<X509Certificate2?>
{
cryptoStore.GetCertificate()
},
IgnoreCertificateChainErrors = cryptoStore.IgnoreSSLErrors,
IgnoreCertificateRevocationErrors = cryptoStore.IgnoreSSLErrors,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
AllowUntrustedCertificates = cryptoStore.IgnoreSSLErrors
AllowUntrustedCertificates = cryptoStore.IgnoreSSLErrors,
CertificateValidationHandler = eventArgs =>
{
eventArgs.Certificate = cryptoStore.GetCertificate();
return true;
}
};
}

Expand Down
2 changes: 1 addition & 1 deletion AstarteDeviceSDKCSharp/Crypto/IAstarteCryptoStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;
using System.Security.Cryptography.X509Certificates;

namespace AstarteDeviceSDKCSharp.Crypto
Expand Down
28 changes: 20 additions & 8 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
using AstarteDeviceSDKCSharp.Utilities;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Publishing;
using MQTTnet.Exceptions;
using System.Diagnostics;
using System.IO.Compression;
using System.Runtime.CompilerServices;
using System.Text;

namespace AstarteDeviceSDKCSharp.Transport.MQTT
Expand Down Expand Up @@ -94,8 +92,17 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent)
Trace.WriteLine("Transport Connected");
}

_client.UseApplicationMessageReceivedHandler(OnMessageReceive);
_client.UseDisconnectedHandler(OnDisconnectAsync);
if (_client is not null)
{
_client.ApplicationMessageReceivedAsync += e =>
{
OnMessageReceive(e);
return Task.CompletedTask;
};

_client.DisconnectedAsync += OnDisconnectAsync;

}

}

Expand All @@ -114,8 +121,8 @@ public override async Task Connect()
_client = await InitClientAsync();
}

var result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(),
CancellationToken.None);
MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(),
CancellationToken.None).ConfigureAwait(false);

if (result.ResultCode == MqttClientConnectResultCode.Success)
{
Expand Down Expand Up @@ -192,6 +199,11 @@ private async Task SendEmptyCacheAsync()
.WithRetainFlag(false)
.Build();

if (_client is null)
{
return;
}

MqttClientPublishResult result = await _client.PublishAsync(applicationMessage);
if (result.ReasonCode != MqttClientPublishReasonCode.Success)
{
Expand All @@ -200,7 +212,7 @@ private async Task SendEmptyCacheAsync()
}
}

private Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e)
Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e)
{
if (_astarteTransportEventListener != null)
{
Expand Down
27 changes: 15 additions & 12 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
using AstarteDeviceSDKCSharp.Utilities;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Publishing;
using MQTTnet.Exceptions;
using MQTTnet.Protocol;
using System.Text;
using static AstarteDeviceSDKCSharp.Protocol.AstarteInterfaceDatastreamMapping;

Expand Down Expand Up @@ -69,7 +69,7 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface

try
{
await DoSendMqttMessage(topic, payload, qos);
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
catch (MqttCommunicationException ex)
{
Expand All @@ -85,7 +85,7 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface

}

private async Task DoSendMqttMessage(string topic, byte[] payload, int qos)
private async Task DoSendMqttMessage(string topic, byte[] payload, MqttQualityOfServiceLevel qos)
{
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
Expand All @@ -96,13 +96,16 @@ private async Task DoSendMqttMessage(string topic, byte[] payload, int qos)

try
{
if (_client is not null)
{
MqttClientPublishResult result = await _client.PublishAsync(applicationMessage);

MqttClientPublishResult result = await _client.PublishAsync(applicationMessage);
if (result.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new AstarteTransportException
($"Error publishing on MQTT. Code: {result.ReasonCode}");
}

if (result.ReasonCode != MqttClientPublishReasonCode.Success)
{
throw new AstarteTransportException
($"Error publishing on MQTT. Code: {result.ReasonCode}");
}

}
Expand Down Expand Up @@ -140,7 +143,7 @@ public override async Task SendIntrospection()
.Remove(introspectionStringBuilder.Length - 1, 1);
string introspection = introspectionStringBuilder.ToString();

await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), 2);
await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2);
}

public override async Task SendIndividualValue(AstarteInterface astarteInterface,
Expand All @@ -167,7 +170,7 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast

try
{
await DoSendMqttMessage(topic, payload, qos);
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
catch (MqttCommunicationException e)
{
Expand Down Expand Up @@ -251,7 +254,7 @@ public override void RetryFailedMessages()
private async Task DoSendMessage(IAstarteFailedMessage failedMessage)
{
await DoSendMqttMessage(failedMessage.GetTopic(), failedMessage.GetPayload(),
failedMessage.GetQos());
(MqttQualityOfServiceLevel)failedMessage.GetQos());
}

private int QosFromReliability(AstarteInterfaceDatastreamMapping mapping)
Expand Down Expand Up @@ -322,7 +325,7 @@ private async void DoSendMqttMessage(IAstarteFailedMessage failedMessage)
{
await DoSendMqttMessage(failedMessage.GetTopic(),
failedMessage.GetPayload(),
failedMessage.GetQos());
(MqttQualityOfServiceLevel)failedMessage.GetQos());
}

private void HandleDatastreamFailedPublish(MqttCommunicationException e,
Expand Down
4 changes: 2 additions & 2 deletions AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;


namespace AstarteDeviceSDKCSharp.Transport.MQTT
Expand All @@ -29,6 +29,6 @@ public interface IMqttConnectionInfo

string GetClientId();

IMqttClientOptions GetMqttConnectOptions();
MqttClientOptions GetMqttConnectOptions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;

namespace AstarteDeviceSDKCSharp.Transport.MQTT
{
public class MutualSSLAuthenticationMqttConnectionInfo : IMqttConnectionInfo
{

private readonly Uri _brokerUrl;
private readonly IMqttClientOptions _mqttConnectOptions;
private readonly MqttClientOptions _mqttConnectOptions;
private readonly string _clientId = string.Empty;

public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRealm,
Expand All @@ -38,7 +38,6 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe
.WithTcpServer(_brokerUrl.Host, _brokerUrl.Port)
.WithTls(tlsOptions)
.WithCleanSession(false)
.WithCommunicationTimeout(TimeSpan.FromSeconds(60))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithSessionExpiryInterval(0)
.Build();
Expand All @@ -50,6 +49,6 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe

public string GetClientId() => _clientId;

public IMqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions;
public MqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions;
}
}

0 comments on commit 5bdfac6

Please sign in to comment.