Skip to content

Commit

Permalink
Refactor strategy for fallback messages
Browse files Browse the repository at this point in the history
Handling exceptions for deadlock threads.
Implement `TryPingAsync` for check is broker
available for sending messages.
Add timeout time to device constructor, timeOut
duration for the connection. Implement new field
in constructors  E2E and Example.
Add timeout for HTTP request on Pairing API.

Signed-off-by: Osman Hadzic <osman.hadzic@secomind.com>
  • Loading branch information
osmanhadzic committed Mar 22, 2024
1 parent 5bdfac6 commit 455af7a
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 35 deletions.
14 changes: 7 additions & 7 deletions AstarteDeviceSDKCSharp.Tests/AstartePairingServiceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public async Task TestThrowExceptionOnUnsuccessfulRegisterDeviceWithJwtTokenAsyn
.WithHeader("Content-Type", "application/json; charset=utf-8")
);

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));
//Assert
_ = await Assert.ThrowsAsync<AstartePairingException>(() =>
//Act
Expand All @@ -75,7 +75,7 @@ public async Task TestThrowExceptionOnEmptyResponseDeviceWithJwtTokenAsync()
+ "}")
); ;

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));

//Assert
_ = await Assert.ThrowsAsync<AstartePairingException>(() =>
Expand Down Expand Up @@ -116,7 +116,7 @@ public async Task TestSuccessfullRegisterDeviceWithJwtTokenAsync()
.WithHeader("Content-Type", "application/json; charset=utf-8")
.WithBody(responseBody)
);
_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));

//Act
string credentialSecret = await _service.RegisterDeviceWithJwtToken(deviceId, expectedCredentialSecret);
Expand Down Expand Up @@ -151,7 +151,7 @@ public async Task TestThrowExceptionOnUnsuccessfulRegisterDeviceWithPrivateKeyAs
.WithHeader("Content-Type", "application/json; charset=utf-8")
);

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));
//Assert
_ = await Assert.ThrowsAsync<AstartePairingException>(() =>
//Act
Expand Down Expand Up @@ -198,7 +198,7 @@ public async Task TestSuccessfullRegisterDeviceWithPrivateKeyAsync()
"JdorKpZMgsh6rzLQ==");
string privateKey = new(PemEncoding.Write("EC PRIVATE KEY", array).ToArray());

_service = new AstartePairingService($"{_server.Urls[0]}", "test");
_service = new AstartePairingService($"{_server.Urls[0]}", "test", TimeSpan.FromMilliseconds(500));
var myPath = Path.GetDirectoryName(
System.Reflection.Assembly.GetExecutingAssembly().Location);
myPath = Path.Combine(myPath, "key.pem");
Expand Down Expand Up @@ -242,7 +242,7 @@ public void TestPairingServiceUrlConstructor_Local()
Uri _expectedPairingUrl = new Uri("http://localhost:4003");

//Act
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test");
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test", TimeSpan.FromMilliseconds(500));

Uri _actualPairingUrl = _service.PairingUrl();

Expand All @@ -260,7 +260,7 @@ public void TestPairingServiceUrlConstructor_Custom()
Uri _expectedPairingUrl = new Uri("https://astarte.instance.test/pairing");

//Act
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test");
_service = new AstartePairingService(_expectedPairingUrl.OriginalString, "test", TimeSpan.FromMilliseconds(500));

Uri _actualPairingUrl = _service.PairingUrl();

Expand Down
8 changes: 5 additions & 3 deletions AstarteDeviceSDKCSharp/AstartePairingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ public class AstartePairingHandler
readonly AstarteCryptoStore _cryptoStore;
private List<AstarteTransport>? _transports;
private X509Certificate2? _certificate;
private TimeSpan _timeOut;

public AstartePairingHandler(string pairingUrl, string astarteRealm, string deviceId,
string credentialSecret, AstarteCryptoStore astarteCryptoStore)
string credentialSecret, AstarteCryptoStore astarteCryptoStore, TimeSpan timeout)
{
_astarteRealm = astarteRealm;
_deviceId = deviceId;
_credentialSecret = credentialSecret;
_cryptoStore = astarteCryptoStore;
_AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm);
_timeOut = timeout;
_AstartePairingService = new AstartePairingService(pairingUrl, astarteRealm, timeout);

_certificate = _cryptoStore.GetCertificate();
if (_certificate == null)
Expand All @@ -60,7 +62,7 @@ public async Task Init()
private async Task ReloadTransports()
{
_transports = await _AstartePairingService.ReloadTransports(_credentialSecret,
_cryptoStore, _deviceId);
_cryptoStore, _deviceId, _timeOut);
}

public List<AstarteTransport> GetTransports()
Expand Down
14 changes: 9 additions & 5 deletions AstarteDeviceSDKCSharp/AstartePairingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ public class AstartePairingService
private readonly string _astarteRealm;
private readonly HttpClient _httpClient;

public AstartePairingService(string pairingUrl, string astarteRealm)
public AstartePairingService(string pairingUrl, string astarteRealm, TimeSpan timeOut)
{
_astarteRealm = astarteRealm;
_pairingUrl = new Uri($"{pairingUrl.TrimEnd('/')}/v1");
_httpClient = new HttpClient();

_httpClient = new HttpClient
{
Timeout = timeOut
};
}

internal async Task<List<AstarteTransport>> ReloadTransports(string credentialSecret,
AstarteCryptoStore astarteCryptoStore, string deviceId)
AstarteCryptoStore astarteCryptoStore, string deviceId, TimeSpan timeOut)
{
List<AstarteTransport> transports = new();
// Prepare the Pairing API request
Expand Down Expand Up @@ -101,7 +103,8 @@ internal async Task<List<AstarteTransport>> ReloadTransports(string credentialSe
_astarteRealm,
deviceId,
item,
astarteCryptoStore);
astarteCryptoStore,
timeOut);

transports.Add(supportedTransport);
}
Expand Down Expand Up @@ -135,6 +138,7 @@ public async Task<X509Certificate2> RequestNewCertificate
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Bearer", credentialSecret);


try
{

Expand Down
20 changes: 18 additions & 2 deletions AstarteDeviceSDKCSharp/Device/AstarteDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public class AstarteDevice : IAstarteTransportEventListener
/// It can be a shared directory for multiple devices, a subdirectory for the given device
/// ID will be created.
/// </param>
/// <param name="timeOut">The timeout duration for the connection.</param>
public AstarteDevice(
string deviceId,
string astarteRealm,
string credentialSecret,
IAstarteInterfaceProvider astarteInterfaceProvider,
string pairingBaseUrl,
string cryptoStoreDirectory,
TimeSpan? timeOut,
bool ignoreSSLErrors = false)
{
if (!Directory.Exists(cryptoStoreDirectory))
Expand All @@ -91,12 +93,15 @@ public AstarteDevice(
AstarteCryptoStore astarteCryptoStore = new AstarteCryptoStore(fullCryptoDirPath);
astarteCryptoStore.IgnoreSSLErrors = ignoreSSLErrors;



_pairingHandler = new AstartePairingHandler(
pairingBaseUrl,
astarteRealm,
deviceId,
credentialSecret,
astarteCryptoStore);
astarteCryptoStore,
(TimeSpan)(timeOut is null ? TimeSpan.FromSeconds(5) : timeOut));

astartePropertyStorage = new AstartePropertyStorage(fullCryptoDirPath);

Expand Down Expand Up @@ -180,7 +185,18 @@ private bool EventualyReconnect()
{
interval = MAX_INCREMENT_INTERVAL;
}
Task.Run(async () => await Connect()).Wait(interval);

try
{
Task.Run(async () => await Connect()).Wait(interval);
}
catch (AggregateException ex)
{
foreach (var innerException in ex.InnerExceptions)
{
Trace.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}");
}
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions AstarteDeviceSDKCSharp/Transport/AstarteTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal class AstarteTransportFactory

public static AstarteTransport? CreateAstarteTransportFromPairing
(AstarteProtocolType protocolType, string astarteRealm,
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore)
string deviceId, dynamic protocolData, AstarteCryptoStore astarteCryptoStore, TimeSpan timeOut)
{

switch (protocolType)
Expand All @@ -40,7 +40,8 @@ public static AstarteTransport? CreateAstarteTransportFromPairing
new MutualSSLAuthenticationMqttConnectionInfo(brokerUrl,
astarteRealm,
deviceId,
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters())
astarteCryptoStore.GetMqttClientOptionsBuilderTlsParameters(),
timeOut)
);
default:
return null;
Expand Down
28 changes: 20 additions & 8 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,30 @@ public override async Task Connect()
_client = await InitClientAsync();
}

MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(),
CancellationToken.None).ConfigureAwait(false);

if (result.ResultCode == MqttClientConnectResultCode.Success)
try
{
await CompleteAstarteConnection(result.IsSessionPresent);
using (var timeoutToken = new CancellationTokenSource(_connectionInfo.GetTimeOut()))
{
MqttClientConnectResult result = await _client.ConnectAsync(
_connectionInfo.GetMqttConnectOptions(),
timeoutToken.Token);

if (result.ResultCode == MqttClientConnectResultCode.Success)
{
await CompleteAstarteConnection(result.IsSessionPresent);
}
else
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
}
}
}
else
catch (OperationCanceledException)
{
throw new AstarteTransportException
($"Error connecting to MQTT. Code: {result.ResultCode}");
Trace.WriteLine("Timeout while connecting.");
}

}

public override void Disconnect()
Expand Down
30 changes: 25 additions & 5 deletions AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,17 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface

try
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
if (_client.TryPingAsync().Result)
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
else
{
HandleDatastreamFailedPublish(
new MqttCommunicationException("Broker is not available."),
mapping, topic, payload, qos);
}

}
catch (MqttCommunicationException ex)
{
Expand Down Expand Up @@ -143,7 +153,7 @@ public override async Task SendIntrospection()
.Remove(introspectionStringBuilder.Length - 1, 1);
string introspection = introspectionStringBuilder.ToString();

await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2);
await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), MqttQualityOfServiceLevel.ExactlyOnce);
}

public override async Task SendIndividualValue(AstarteInterface astarteInterface,
Expand All @@ -170,11 +180,21 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast

try
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
if (_client.TryPingAsync().Result)
{
await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos);
}
else
{
HandleDatastreamFailedPublish(
new MqttCommunicationException("Broker is not avelabe"),
mapping, topic, payload, qos);
}

}
catch (MqttCommunicationException e)
catch (MqttCommunicationException ex)
{
HandleDatastreamFailedPublish(e, mapping, topic, payload, qos);
HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos);
_astarteTransportEventListener?.OnTransportDisconnected();
}
catch (AstarteTransportException ex)
Expand Down
2 changes: 2 additions & 0 deletions AstarteDeviceSDKCSharp/Transport/MQTT/IMqttConnectionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ public interface IMqttConnectionInfo
string GetClientId();

MqttClientOptions GetMqttConnectOptions();

TimeSpan GetTimeOut();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public class MutualSSLAuthenticationMqttConnectionInfo : IMqttConnectionInfo
private readonly Uri _brokerUrl;
private readonly MqttClientOptions _mqttConnectOptions;
private readonly string _clientId = string.Empty;
private readonly TimeSpan _timeOut;

public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRealm,
string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions)
string deviceId, MqttClientOptionsBuilderTlsParameters tlsOptions, TimeSpan timeOut)
{
_brokerUrl = brokerUrl;
_mqttConnectOptions = new MqttClientOptionsBuilder()
Expand All @@ -40,15 +41,19 @@ public MutualSSLAuthenticationMqttConnectionInfo(Uri brokerUrl, string astarteRe
.WithCleanSession(false)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithSessionExpiryInterval(0)
.WithTimeout(timeOut)
.Build();

_timeOut = timeOut;
_clientId = $"{astarteRealm}/{deviceId}";
}

public Uri GetBrokerUrl() => _brokerUrl;

public string GetClientId() => _clientId;

public TimeSpan GetTimeOut() => _timeOut;

public MqttClientOptions GetMqttConnectOptions() => _mqttConnectOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static AstarteDevice Instance
new MockInterfaceProvider(),
astarteMockData.PairingUrl,
cryptoStoreDir,
TimeSpan.FromMilliseconds(500),
true
);
astarteDevice.SetAlwaysReconnect(true);
Expand Down
9 changes: 7 additions & 2 deletions AstarteDeviceSDKExample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ static async Task<int> Main(string[] args)
Guid nameSpace = Guid.NewGuid();
string macAdress = "0099112233";
deviceId = AstarteDeviceIdUtils.GenerateId(nameSpace, macAdress);
credentialsSecret = await new AstartePairingService(pairingUrl, realm)
credentialsSecret = await new AstartePairingService(
pairingUrl,
realm,
TimeSpan.FromSeconds(1))
.RegisterDeviceWithJwtToken(deviceId, jwt);
}

Expand All @@ -93,7 +96,9 @@ static async Task<int> Main(string[] args)
credentialsSecret,
interfaceProvider,
pairingUrl,
cryptoStoreDir);
cryptoStoreDir,
TimeSpan.FromMilliseconds(500),
true);

myDevice.SetAlwaysReconnect(true);
myDevice.SetAstarteMessageListener(new ExampleMessageListener());
Expand Down

0 comments on commit 455af7a

Please sign in to comment.