Skip to content

Commit

Permalink
Merge pull request #69 from osmanhadzic/update-mqttnet-lib
Browse files Browse the repository at this point in the history
Update version MQTTnet library
  • Loading branch information
harlem88 authored Mar 22, 2024
2 parents 57cab2c + b719ea0 commit 50ce0b6
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 65 deletions.
14 changes: 7 additions & 7 deletions AstarteDeviceSDKCSharp.Tests/AstartePairingServiceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task TestThrowExceptionOnUnsuccessfulRegisterDeviceWithJwtTokenAsyn
.WithHeader("Content-Type", "application/json; charset=utf-8")
);

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));
//Assert
_ = await Assert.ThrowsAsync<AstartePairingException>(() =>
//Act
Expand All @@ -75,7 +75,7 @@ public async Task TestThrowExceptionOnEmptyResponseDeviceWithJwtTokenAsync()
+ "}")
); ;

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));

//Assert
_ = await Assert.ThrowsAsync<AstartePairingException>(() =>
Expand Down Expand Up @@ -116,7 +116,7 @@ public async Task TestSuccessfullRegisterDeviceWithJwtTokenAsync()
.WithHeader("Content-Type", "application/json; charset=utf-8")
.WithBody(responseBody)
);
_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));

//Act
string credentialSecret = await _service.RegisterDeviceWithJwtToken(deviceId, expectedCredentialSecret);
Expand Down Expand Up @@ -151,7 +151,7 @@ public async Task TestThrowExceptionOnUnsuccessfulRegisterDeviceWithPrivateKeyAs
.WithHeader("Content-Type", "application/json; charset=utf-8")
);

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));
//Assert
_ = await Assert.ThrowsAsync<AstartePairingException>(() =>
//Act
Expand Down Expand Up @@ -198,7 +198,7 @@ public async Task TestSuccessfullRegisterDeviceWithPrivateKeyAsync()
"JdorKpZMgsh6rzLQ==");
string privateKey = new(PemEncoding.Write("EC PRIVATE KEY", array).ToArray());

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));
var myPath = Path.GetDirectoryName(
System.Reflection.Assembly.GetExecutingAssembly().Location);
myPath = Path.Combine(myPath, "key.pem");
Expand Down Expand Up @@ -242,7 +242,7 @@ public void TestPairingServiceUrlConstructor_Local()
Uri _expectedPairingUrl = new Uri("http://localhost:4003");

//Act
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test");
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test", TimeSpan.FromMilliseconds(500));

Uri _actualPairingUrl = _service.PairingUrl();

Expand All @@ -260,7 +260,7 @@ public void TestPairingServiceUrlConstructor_Custom()
Uri _expectedPairingUrl = new Uri("https://astarte.instance.test/pairing");

//Act
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test");
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test", TimeSpan.FromMilliseconds(500));

Uri _actualPairingUrl = _service.PairingUrl();

Expand Down
2 changes: 1 addition & 1 deletion AstarteDeviceSDKCSharp/AstarteDeviceSDKCSharp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ SPDX-License-Identifier: Apache-2.0 -->
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="JWT" Version="9.0.3" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="MQTTnet" Version="4.1.4.563" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.2" />
<PackageReference Include="Newtonsoft.Json.Bson" Version="1.0.2" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="6.25.1" />
Expand Down
8 changes: 5 additions & 3 deletions AstarteDeviceSDKCSharp/AstartePairingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ public class AstartePairingHandler
readonly AstarteCryptoStore _cryptoStore;
private List<AstarteTransport>? _transports;
private X509Certificate2? _certificate;
private TimeSpan _timeOut;

public AstartePairingHandler(string pairingUrl, string astarteRealm, string deviceId,
string credentialSecret, AstarteCryptoStore astarteCryptoStore)
string credentialSecret, AstarteCryptoStore astarteCryptoStore, TimeSpan timeout)
{
_astarteRealm = astarteRealm;
_deviceId = deviceId;
_credentialSecret = credentialSecret;
_cryptoStore = astarteCryptoStore;
_AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm);
_timeOut = timeout;
_AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm, timeout);

_certificate = _cryptoStore.GetCertificate();
if (_certificate == null)
Expand All @@ -60,7 +62,7 @@ public async Task Init()
private async Task ReloadTransports()
{
_transports = await _AstartePairingService.ReloadTransports(_credentialSecret,
_cryptoStore, _deviceId);
_cryptoStore, _deviceId, _timeOut);
}

public List<AstarteTransport> GetTransports()
Expand Down
14 changes: 9 additions & 5 deletions AstarteDeviceSDKCSharp/AstartePairingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ public class AstartePairingService
private readonly string _astarteRealm;
private readonly HttpClient _httpClient;

public AstartePairingService(string pairingUrl, string astarteRealm)
public AstartePairingService(string pairingUrl, string astarteRealm, TimeSpan timeOut)
{
_astarteRealm = astarteRealm;
_pairingUrl = new Uri($"{pairingUrl.TrimEnd('/')}/v1");
_httpClient = new HttpClient();

_httpClient = new HttpClient
{
Timeout = timeOut
};
}

internal async Task<List<AstarteTransport>> ReloadTransports(string credentialSecret,
AstarteCryptoStore astarteCryptoStore, string deviceId)
AstarteCryptoStore astarteCryptoStore, string deviceId, TimeSpan timeOut)
{
List<AstarteTransport> transports = new();
// Prepare the Pairing API request
Expand Down Expand Up @@ -101,7 +103,8 @@ internal async Task<List<AstarteTransport>> ReloadTransports(string credentialSe
_astarteRealm,
deviceId,
item,
astarteCryptoStore);
astarteCryptoStore,
timeOut);

transports.Add(supportedTransport);
}
Expand Down Expand Up @@ -135,6 +138,7 @@ public async Task<X509Certificate2> RequestNewCertificate
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Bearer", credentialSecret);


try
{

Expand Down
6 changes: 3 additions & 3 deletions AstarteDeviceSDKCSharp/Crypto/AstarteCryptoStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
Expand Down Expand Up @@ -55,13 +55,13 @@ private void LoadNewCertificate()

}

public void SaveCertificateIfNotExist(X509Certificate2 x509Certificate)
public void SaveCertificateIfNotExist(X509Certificate2 x509Certificate2)
{
//Save certificate to file
try
{
File.WriteAllText(Path.Combine(_cryptoStoreDir, "device.crt"),
new(PemEncoding.Write("CERTIFICATE", x509Certificate.GetRawCertData()).ToArray()));
new(PemEncoding.Write("CERTIFICATE", x509Certificate2.GetRawCertData()).ToArray()));
}
catch (DirectoryNotFoundException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;
using System.Security.Cryptography.X509Certificates;

namespace AstarteDeviceSDKCSharp.Crypto
{
public class AstarteMutualTLSParametersFactory : MqttClientOptionsBuilderTlsParameters
public class AstarteMutualTLSParametersFactory
{

private readonly MqttClientOptionsBuilderTlsParameters _tlsOptions;
Expand All @@ -33,14 +33,19 @@ public AstarteMutualTLSParametersFactory(IAstarteCryptoStore cryptoStore) : base
_tlsOptions = new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List<X509Certificate?>
Certificates = new List<X509Certificate2?>
{
cryptoStore.GetCertificate()
},
IgnoreCertificateChainErrors = cryptoStore.IgnoreSSLErrors,
IgnoreCertificateRevocationErrors = cryptoStore.IgnoreSSLErrors,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
AllowUntrustedCertificates = cryptoStore.IgnoreSSLErrors
AllowUntrustedCertificates = cryptoStore.IgnoreSSLErrors,
CertificateValidationHandler = eventArgs =>
{
eventArgs.Certificate = cryptoStore.GetCertificate();
return true;
}
};
}

Expand Down
2 changes: 1 addition & 1 deletion AstarteDeviceSDKCSharp/Crypto/IAstarteCryptoStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

using MQTTnet.Client.Options;
using MQTTnet.Client;
using System.Security.Cryptography.X509Certificates;

namespace AstarteDeviceSDKCSharp.Crypto
Expand Down
20 changes: 18 additions & 2 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public class AstarteDevice : IAstarteTransportEventListener
/// It can be a shared directory for multiple devices, a subdirectory for the given device
/// ID will be created.
/// </param>
/// <param name="timeOut">The timeout duration for the connection.</param>
public AstarteDevice(
string deviceId,
string astarteRealm,
string credentialSecret,
IAstarteInterfaceProvider astarteInterfaceProvider,
string pairingBaseUrl,
string cryptoStoreDirectory,
TimeSpan? timeOut,
bool ignoreSSLErrors = false)
{
if (!Directory.Exists(cryptoStoreDirectory))
Expand All @@ -91,12 +93,15 @@ public AstarteDevice(
AstarteCryptoStore astarteCryptoStore = new AstarteCryptoStore(fullCryptoDirPath);
astarteCryptoStore.IgnoreSSLErrors = ignoreSSLErrors;



_pairingHandler = new AstartePairingHandler(
pairingBaseUrl,
astarteRealm,
deviceId,
credentialSecret,
astarteCryptoStore);
astarteCryptoStore,
(TimeSpan)(timeOut is null ? TimeSpan.FromSeconds(5) : timeOut));

astartePropertyStorage = new AstartePropertyStorage(fullCryptoDirPath);

Expand Down Expand Up @@ -180,7 +185,18 @@ private bool EventualyReconnect()
{
interval = MAX_INCREMENT_INTERVAL;
}
Task.Run(async () => await Connect()).Wait(interval);

try
{
Task.Run(async () => await Connect()).Wait(interval);
}
catch (AggregateException ex)
{
foreach (var innerException in ex.InnerExceptions)
{
Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}");
}
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class AstarteTransportFactory

public static AstarteTransport? CreateAstarteTransportFromPairing
(AstarteProtocolType protocolType, string astarteRealm,
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore)
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore, TimeSpan timeOut)
{

switch (protocolType)
Expand All @@ -40,7 +40,8 @@ public static AstarteTransport? CreateAstarteTransportFromPairing
new MutualSSLAuthenticationMqttConnectionInfo(brokerUrl,
astarteRealm,
deviceId,
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters())
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters(),
timeOut)
);
default:
return null;
Expand Down
52 changes: 38 additions & 14 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
using AstarteDeviceSDKCSharp.Utilities;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Publishing;
using MQTTnet.Exceptions;
using System.Diagnostics;
using System.IO.Compression;
using System.Runtime.CompilerServices;
using System.Text;

namespace AstarteDeviceSDKCSharp.Transport.MQTT
Expand Down Expand Up @@ -94,8 +92,17 @@ private async Task CompleteAstarteConnection(bool IsSessionPresent)
Trace.WriteLine("Transport Connected");
}

_client.UseApplicationMessageReceivedHandler(OnMessageReceive);
_client.UseDisconnectedHandler(OnDisconnectAsync);
if (_client is not null)
{
_client.ApplicationMessageReceivedAsync += e =>
{
OnMessageReceive(e);
return Task.CompletedTask;
};

_client.DisconnectedAsync += OnDisconnectAsync;

}

}

Expand All @@ -114,18 +121,30 @@ public override async Task Connect()
_client = await InitClientAsync();
}

var result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(),
CancellationToken.None);

if (result.ResultCode == MqttClientConnectResultCode.Success)
try
{
await CompleteAstarteConnection(result.IsSessionPresent);
using (var timeoutToken = new CancellationTokenSource(_connectionInfo.GetTimeOut()))
{
MqttClientConnectResult result = await _client.ConnectAsync(
_connectionInfo.GetMqttConnectOptions(),
timeoutToken.Token);

if (result.ResultCode == MqttClientConnectResultCode.Success)
{
await CompleteAstarteConnection(result.IsSessionPresent);
}
else
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
}
}
}
else
catch (OperationCanceledException)
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
Trace.WriteLine("Timeout while connecting.");
}

}

public override void Disconnect()
Expand Down Expand Up @@ -192,6 +211,11 @@ private async Task SendEmptyCacheAsync()
.WithRetainFlag(false)
.Build();

if (_client is null)
{
return;
}

MqttClientPublishResult result = await _client.PublishAsync(applicationMessage);
if (result.ReasonCode != MqttClientPublishReasonCode.Success)
{
Expand All @@ -200,7 +224,7 @@ private async Task SendEmptyCacheAsync()
}
}

private Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e)
Task OnDisconnectAsync(MqttClientDisconnectedEventArgs e)
{
if (_astarteTransportEventListener != null)
{
Expand Down
Loading

0 comments on commit 50ce0b6

Please sign in to comment.