From a475fa17ca502a206d1b07bb440a22eaf41c3ef3 Mon Sep 17 00:00:00 2001 From: Justin Boswell Date: Tue, 5 Nov 2019 16:51:59 -0800 Subject: [PATCH] Updated to CRT v0.4.2, switched ByteBuffers to byte[] in MQTT (#22) * Updated to CRT v0.4.2, switched ByteBuffers to byte[] in MQTT --- .gitignore | 2 ++ .../src/main/java/pubsub/PubSub.java | 7 ++-- .../main/java/pubsubstress/PubSubStress.java | 6 ++-- .../src/main/java/shadow/ShadowSample.java | 1 - sdk/pom.xml | 2 +- .../awssdk/iot/iotjobs/IotJobsClient.java | 36 ++++++++----------- .../awssdk/iot/iotshadow/IotShadowClient.java | 30 +++++++--------- 7 files changed, 33 insertions(+), 51 deletions(-) diff --git a/.gitignore b/.gitignore index db2f5c94f..3bd89fbe6 100644 --- a/.gitignore +++ b/.gitignore @@ -174,3 +174,5 @@ $RECYCLE.BIN/ .project bin/ +.classpath +.settings \ No newline at end of file diff --git a/samples/BasicPubSub/src/main/java/pubsub/PubSub.java b/samples/BasicPubSub/src/main/java/pubsub/PubSub.java index cec93dbb7..2d938ab01 100644 --- a/samples/BasicPubSub/src/main/java/pubsub/PubSub.java +++ b/samples/BasicPubSub/src/main/java/pubsub/PubSub.java @@ -17,7 +17,6 @@ import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.io.ClientBootstrap; -import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.TlsContext; import software.amazon.awssdk.crt.io.TlsContextOptions; import software.amazon.awssdk.crt.mqtt.MqttClient; @@ -169,7 +168,7 @@ public void onConnectionResumed(boolean sessionPresent) { CompletableFuture subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); System.out.println("MESSAGE: " + payload); } catch (UnsupportedEncodingException ex) { System.out.println("Unable to decode payload: " + ex.getMessage()); @@ -180,9 +179,7 @@ public void onConnectionResumed(boolean sessionPresent) { int count = 0; while (count++ < messagesToPublish) { - ByteBuffer payload = ByteBuffer.allocateDirect(message.length()); - payload.put(message.getBytes()); - CompletableFuture published = connection.publish(new MqttMessage(topic, payload), QualityOfService.AT_LEAST_ONCE, false); + CompletableFuture published = connection.publish(new MqttMessage(topic, message.getBytes()), QualityOfService.AT_LEAST_ONCE, false); published.get(); Thread.sleep(1000); } diff --git a/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java b/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java index 5d08f9bce..bacca718c 100644 --- a/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java +++ b/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java @@ -222,7 +222,7 @@ public void onConnectionResumed(boolean sessionPresent) { String clientTopic = String.format("%s%d", topic, i); connectionState.subscribeFuture = connectionState.connection.subscribe(clientTopic, QualityOfService.AT_LEAST_ONCE, (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); System.out.println(String.format("(Topic %s): MESSAGE: %s", clientTopic, payload)); } catch (UnsupportedEncodingException ex) { System.out.println(String.format("(Topic %s): Unable to decode payload: %s", clientTopic, ex.getMessage())); @@ -307,8 +307,6 @@ public static void main(String[] args) { for(int count = 0; count < messagesToPublish; ++count) { String messageContent = String.format("%s #%d", message, count + 1); - ByteBuffer payload = ByteBuffer.allocateDirect(messageContent.length()); - payload.put(messageContent.getBytes()); // Pick a random connection to publish from int connectionIndex = validIndices.get(Math.abs(rng.nextInt()) % validIndices.size()); @@ -319,7 +317,7 @@ public static void main(String[] args) { int topicIndex = validIndices.get(Math.abs(rng.nextInt()) % validIndices.size()); String publishTopic = String.format("%s%d", topic, topicIndex); - publishFutures.add(connection.publish(new MqttMessage(publishTopic, payload), QualityOfService.AT_LEAST_ONCE, false)); + publishFutures.add(connection.publish(new MqttMessage(publishTopic, messageContent.getBytes()), QualityOfService.AT_LEAST_ONCE, false)); if (count % PROGRESS_OP_COUNT == 0) { System.out.println(String.format("(Main Thread) Message publish count: %d", count)); diff --git a/samples/Shadow/src/main/java/shadow/ShadowSample.java b/samples/Shadow/src/main/java/shadow/ShadowSample.java index 54198f601..a7f84d21a 100644 --- a/samples/Shadow/src/main/java/shadow/ShadowSample.java +++ b/samples/Shadow/src/main/java/shadow/ShadowSample.java @@ -17,7 +17,6 @@ import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.io.ClientBootstrap; -import software.amazon.awssdk.crt.io.EventLoopGroup; import software.amazon.awssdk.crt.io.TlsContext; import software.amazon.awssdk.crt.io.TlsContextOptions; import software.amazon.awssdk.crt.mqtt.MqttClient; diff --git a/sdk/pom.xml b/sdk/pom.xml index 97ccf0323..40dc7127b 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -41,7 +41,7 @@ software.amazon.awssdk.crt aws-crt - 0.3.29 + 0.4.2 compile diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsClient.java index e1419238b..53beecb40 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsClient.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsClient.java @@ -91,7 +91,7 @@ public CompletableFuture SubscribeToJobExecutionsChangedEvents( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); JobExecutionsChangedEvent response = gson.fromJson(payload, JobExecutionsChangedEvent.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -124,7 +124,7 @@ public CompletableFuture SubscribeToStartNextPendingJobExecutionAccepte topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); StartNextJobExecutionResponse response = gson.fromJson(payload, StartNextJobExecutionResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -163,7 +163,7 @@ public CompletableFuture SubscribeToDescribeJobExecutionRejected( topic = topic.replace("{jobId}", request.jobId); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); RejectedError response = gson.fromJson(payload, RejectedError.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -196,7 +196,7 @@ public CompletableFuture SubscribeToNextJobExecutionChangedEvents( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); NextJobExecutionChangedEvent response = gson.fromJson(payload, NextJobExecutionChangedEvent.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -235,7 +235,7 @@ public CompletableFuture SubscribeToUpdateJobExecutionRejected( topic = topic.replace("{jobId}", request.jobId); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); RejectedError response = gson.fromJson(payload, RejectedError.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -274,7 +274,7 @@ public CompletableFuture SubscribeToUpdateJobExecutionAccepted( topic = topic.replace("{jobId}", request.jobId); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); UpdateJobExecutionResponse response = gson.fromJson(payload, UpdateJobExecutionResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -310,9 +310,7 @@ public CompletableFuture PublishUpdateJobExecution( } topic = topic.replace("{jobId}", request.jobId); String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } @@ -336,7 +334,7 @@ public CompletableFuture SubscribeToDescribeJobExecutionAccepted( topic = topic.replace("{jobId}", request.jobId); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); DescribeJobExecutionResponse response = gson.fromJson(payload, DescribeJobExecutionResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -366,9 +364,7 @@ public CompletableFuture PublishGetPendingJobExecutions( } topic = topic.replace("{thingName}", request.thingName); String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } @@ -386,7 +382,7 @@ public CompletableFuture SubscribeToGetPendingJobExecutionsAccepted( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); GetPendingJobExecutionsResponse response = gson.fromJson(payload, GetPendingJobExecutionsResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -419,7 +415,7 @@ public CompletableFuture SubscribeToStartNextPendingJobExecutionRejecte topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); RejectedError response = gson.fromJson(payload, RejectedError.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -452,7 +448,7 @@ public CompletableFuture SubscribeToGetPendingJobExecutionsRejected( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); RejectedError response = gson.fromJson(payload, RejectedError.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -482,9 +478,7 @@ public CompletableFuture PublishStartNextPendingJobExecution( } topic = topic.replace("{thingName}", request.thingName); String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } @@ -505,9 +499,7 @@ public CompletableFuture PublishDescribeJobExecution( } topic = topic.replace("{thingName}", request.thingName); String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowClient.java index bb5cf7bda..3e724798d 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowClient.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowClient.java @@ -85,7 +85,7 @@ public CompletableFuture SubscribeToUpdateShadowRejected( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); ErrorResponse response = gson.fromJson(payload, ErrorResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -115,9 +115,7 @@ public CompletableFuture PublishUpdateShadow( } topic = topic.replace("{thingName}", request.thingName); String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } @@ -132,9 +130,7 @@ public CompletableFuture PublishGetShadow( } topic = topic.replace("{thingName}", request.thingName); String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } @@ -152,7 +148,7 @@ public CompletableFuture SubscribeToShadowDeltaUpdatedEvents( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); ShadowDeltaUpdatedEvent response = gson.fromJson(payload, ShadowDeltaUpdatedEvent.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -185,7 +181,7 @@ public CompletableFuture SubscribeToUpdateShadowAccepted( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); UpdateShadowResponse response = gson.fromJson(payload, UpdateShadowResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -214,10 +210,8 @@ public CompletableFuture PublishDeleteShadow( return result; } topic = topic.replace("{thingName}", request.thingName); - String payloadJson = gson.toJson(request); - ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length()); - payload.put(payloadJson.getBytes()); - MqttMessage message = new MqttMessage(topic, payload); + String payloadJson = gson.toJson(request); + MqttMessage message = new MqttMessage(topic, payloadJson.getBytes()); return connection.publish(message, qos, false); } @@ -235,7 +229,7 @@ public CompletableFuture SubscribeToDeleteShadowAccepted( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); DeleteShadowResponse response = gson.fromJson(payload, DeleteShadowResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -268,7 +262,7 @@ public CompletableFuture SubscribeToGetShadowAccepted( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); GetShadowResponse response = gson.fromJson(payload, GetShadowResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -301,7 +295,7 @@ public CompletableFuture SubscribeToShadowUpdatedEvents( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); ShadowUpdatedEvent response = gson.fromJson(payload, ShadowUpdatedEvent.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -334,7 +328,7 @@ public CompletableFuture SubscribeToDeleteShadowRejected( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); ErrorResponse response = gson.fromJson(payload, ErrorResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) { @@ -367,7 +361,7 @@ public CompletableFuture SubscribeToGetShadowRejected( topic = topic.replace("{thingName}", request.thingName); Consumer messageHandler = (message) -> { try { - String payload = new String(message.getPayload().array(), "UTF-8"); + String payload = new String(message.getPayload(), "UTF-8"); ErrorResponse response = gson.fromJson(payload, ErrorResponse.class); handler.accept(response); } catch (UnsupportedEncodingException ex) {