diff --git a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs index 524fa94..3619640 100644 --- a/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs +++ b/AstarteDeviceSDKCSharp/Device/AstarteDevice.cs @@ -180,7 +180,19 @@ private bool EventualyReconnect() { interval = MAX_INCREMENT_INTERVAL; } - Task.Run(async () => await Connect()).Wait(interval); + + try + { + Task.Run(async () => await Connect()).Wait(interval); + } + catch (AggregateException ex) + { + foreach (var innerException in ex.InnerExceptions) + { + // Handle each individual exception + Console.WriteLine($"Inner Exception: {innerException.GetType().Name}: {innerException.Message}"); + } + } } } diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs index 169fd39..e7b7159 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttTransport.cs @@ -121,18 +121,30 @@ public override async Task Connect() _client = await InitClientAsync(); } - MqttClientConnectResult result = await _client.ConnectAsync(_connectionInfo.GetMqttConnectOptions(), - CancellationToken.None).ConfigureAwait(false); - - if (result.ResultCode == MqttClientConnectResultCode.Success) + try { - await CompleteAstarteConnection(result.IsSessionPresent); + using (var timeoutToken = new CancellationTokenSource(TimeSpan.FromMilliseconds(500))) + { + MqttClientConnectResult result = await _client.ConnectAsync( + _connectionInfo.GetMqttConnectOptions(), + timeoutToken.Token); + + if (result.ResultCode == MqttClientConnectResultCode.Success) + { + await CompleteAstarteConnection(result.IsSessionPresent); + } + else + { + throw new AstarteTransportException + ($"Error connecting to MQTT. Code: {result.ResultCode}"); + } + } } - else + catch (OperationCanceledException) { - throw new AstarteTransportException - ($"Error connecting to MQTT. Code: {result.ResultCode}"); + Console.WriteLine("Timeout while connecting."); } + } public override void Disconnect() diff --git a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs index ecb0d37..0816365 100644 --- a/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs +++ b/AstarteDeviceSDKCSharp/Transport/MQTT/AstarteMqttV1Transport.cs @@ -69,11 +69,20 @@ public override async Task SendIndividualValue(AstarteInterface astarteInterface try { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + if (_client.TryPingAsync().Result) + { + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + } + else + { + HandleDatastreamFailedPublish( + new MqttCommunicationException("Broker is not available."), + mapping, topic, payload, qos); + } + } - catch (MqttCommunicationException ex) + catch (MqttCommunicationException) { - HandleDatastreamFailedPublish(ex, mapping, topic, payload, qos); _astarteTransportEventListener?.OnTransportDisconnected(); } catch (AstarteTransportException ex) @@ -143,7 +152,7 @@ public override async Task SendIntrospection() .Remove(introspectionStringBuilder.Length - 1, 1); string introspection = introspectionStringBuilder.ToString(); - await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), (MqttQualityOfServiceLevel)2); + await DoSendMqttMessage(_baseTopic, Encoding.ASCII.GetBytes(introspection), MqttQualityOfServiceLevel.ExactlyOnce); } public override async Task SendIndividualValue(AstarteInterface astarteInterface, @@ -170,11 +179,20 @@ public override async Task SendAggregate(AstarteAggregateDatastreamInterface ast try { - await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + if (_client.TryPingAsync().Result) + { + await DoSendMqttMessage(topic, payload, (MqttQualityOfServiceLevel)qos); + } + else + { + HandleDatastreamFailedPublish( + new MqttCommunicationException("Broker is not avelabe"), + mapping, topic, payload, qos); + } + } - catch (MqttCommunicationException e) + catch (MqttCommunicationException) { - HandleDatastreamFailedPublish(e, mapping, topic, payload, qos); _astarteTransportEventListener?.OnTransportDisconnected(); } catch (AstarteTransportException ex)