Skip to content

Commit

Permalink
Updated to CRT v0.4.2, switched ByteBuffers to byte[] in MQTT (#22)
Browse files Browse the repository at this point in the history
* Updated to CRT v0.4.2, switched ByteBuffers to byte[] in MQTT
  • Loading branch information
Justin Boswell authored Nov 6, 2019
1 parent 6e8f8cd commit a475fa1
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 51 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,5 @@ $RECYCLE.BIN/

.project
bin/
.classpath
.settings
7 changes: 2 additions & 5 deletions samples/BasicPubSub/src/main/java/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.EventLoopGroup;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.io.TlsContextOptions;
import software.amazon.awssdk.crt.mqtt.MqttClient;
Expand Down Expand Up @@ -169,7 +168,7 @@ public void onConnectionResumed(boolean sessionPresent) {

CompletableFuture<Integer> subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
System.out.println("MESSAGE: " + payload);
} catch (UnsupportedEncodingException ex) {
System.out.println("Unable to decode payload: " + ex.getMessage());
Expand All @@ -180,9 +179,7 @@ public void onConnectionResumed(boolean sessionPresent) {

int count = 0;
while (count++ < messagesToPublish) {
ByteBuffer payload = ByteBuffer.allocateDirect(message.length());
payload.put(message.getBytes());
CompletableFuture<Integer> published = connection.publish(new MqttMessage(topic, payload), QualityOfService.AT_LEAST_ONCE, false);
CompletableFuture<Integer> published = connection.publish(new MqttMessage(topic, message.getBytes()), QualityOfService.AT_LEAST_ONCE, false);
published.get();
Thread.sleep(1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void onConnectionResumed(boolean sessionPresent) {
String clientTopic = String.format("%s%d", topic, i);
connectionState.subscribeFuture = connectionState.connection.subscribe(clientTopic, QualityOfService.AT_LEAST_ONCE, (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
System.out.println(String.format("(Topic %s): MESSAGE: %s", clientTopic, payload));
} catch (UnsupportedEncodingException ex) {
System.out.println(String.format("(Topic %s): Unable to decode payload: %s", clientTopic, ex.getMessage()));
Expand Down Expand Up @@ -307,8 +307,6 @@ public static void main(String[] args) {

for(int count = 0; count < messagesToPublish; ++count) {
String messageContent = String.format("%s #%d", message, count + 1);
ByteBuffer payload = ByteBuffer.allocateDirect(messageContent.length());
payload.put(messageContent.getBytes());

// Pick a random connection to publish from
int connectionIndex = validIndices.get(Math.abs(rng.nextInt()) % validIndices.size());
Expand All @@ -319,7 +317,7 @@ public static void main(String[] args) {
int topicIndex = validIndices.get(Math.abs(rng.nextInt()) % validIndices.size());
String publishTopic = String.format("%s%d", topic, topicIndex);

publishFutures.add(connection.publish(new MqttMessage(publishTopic, payload), QualityOfService.AT_LEAST_ONCE, false));
publishFutures.add(connection.publish(new MqttMessage(publishTopic, messageContent.getBytes()), QualityOfService.AT_LEAST_ONCE, false));

if (count % PROGRESS_OP_COUNT == 0) {
System.out.println(String.format("(Main Thread) Message publish count: %d", count));
Expand Down
1 change: 0 additions & 1 deletion samples/Shadow/src/main/java/shadow/ShadowSample.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.EventLoopGroup;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.io.TlsContextOptions;
import software.amazon.awssdk.crt.mqtt.MqttClient;
Expand Down
2 changes: 1 addition & 1 deletion sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<version>0.3.29</version>
<version>0.4.2</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public CompletableFuture<Integer> SubscribeToJobExecutionsChangedEvents(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
JobExecutionsChangedEvent response = gson.fromJson(payload, JobExecutionsChangedEvent.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -124,7 +124,7 @@ public CompletableFuture<Integer> SubscribeToStartNextPendingJobExecutionAccepte
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
StartNextJobExecutionResponse response = gson.fromJson(payload, StartNextJobExecutionResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -163,7 +163,7 @@ public CompletableFuture<Integer> SubscribeToDescribeJobExecutionRejected(
topic = topic.replace("{jobId}", request.jobId);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
RejectedError response = gson.fromJson(payload, RejectedError.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -196,7 +196,7 @@ public CompletableFuture<Integer> SubscribeToNextJobExecutionChangedEvents(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
NextJobExecutionChangedEvent response = gson.fromJson(payload, NextJobExecutionChangedEvent.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -235,7 +235,7 @@ public CompletableFuture<Integer> SubscribeToUpdateJobExecutionRejected(
topic = topic.replace("{jobId}", request.jobId);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
RejectedError response = gson.fromJson(payload, RejectedError.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -274,7 +274,7 @@ public CompletableFuture<Integer> SubscribeToUpdateJobExecutionAccepted(
topic = topic.replace("{jobId}", request.jobId);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
UpdateJobExecutionResponse response = gson.fromJson(payload, UpdateJobExecutionResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -310,9 +310,7 @@ public CompletableFuture<Integer> PublishUpdateJobExecution(
}
topic = topic.replace("{jobId}", request.jobId);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand All @@ -336,7 +334,7 @@ public CompletableFuture<Integer> SubscribeToDescribeJobExecutionAccepted(
topic = topic.replace("{jobId}", request.jobId);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
DescribeJobExecutionResponse response = gson.fromJson(payload, DescribeJobExecutionResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -366,9 +364,7 @@ public CompletableFuture<Integer> PublishGetPendingJobExecutions(
}
topic = topic.replace("{thingName}", request.thingName);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand All @@ -386,7 +382,7 @@ public CompletableFuture<Integer> SubscribeToGetPendingJobExecutionsAccepted(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
GetPendingJobExecutionsResponse response = gson.fromJson(payload, GetPendingJobExecutionsResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -419,7 +415,7 @@ public CompletableFuture<Integer> SubscribeToStartNextPendingJobExecutionRejecte
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
RejectedError response = gson.fromJson(payload, RejectedError.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -452,7 +448,7 @@ public CompletableFuture<Integer> SubscribeToGetPendingJobExecutionsRejected(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
RejectedError response = gson.fromJson(payload, RejectedError.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -482,9 +478,7 @@ public CompletableFuture<Integer> PublishStartNextPendingJobExecution(
}
topic = topic.replace("{thingName}", request.thingName);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand All @@ -505,9 +499,7 @@ public CompletableFuture<Integer> PublishDescribeJobExecution(
}
topic = topic.replace("{thingName}", request.thingName);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public CompletableFuture<Integer> SubscribeToUpdateShadowRejected(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
ErrorResponse response = gson.fromJson(payload, ErrorResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -115,9 +115,7 @@ public CompletableFuture<Integer> PublishUpdateShadow(
}
topic = topic.replace("{thingName}", request.thingName);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand All @@ -132,9 +130,7 @@ public CompletableFuture<Integer> PublishGetShadow(
}
topic = topic.replace("{thingName}", request.thingName);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand All @@ -152,7 +148,7 @@ public CompletableFuture<Integer> SubscribeToShadowDeltaUpdatedEvents(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
ShadowDeltaUpdatedEvent response = gson.fromJson(payload, ShadowDeltaUpdatedEvent.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -185,7 +181,7 @@ public CompletableFuture<Integer> SubscribeToUpdateShadowAccepted(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
UpdateShadowResponse response = gson.fromJson(payload, UpdateShadowResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -214,10 +210,8 @@ public CompletableFuture<Integer> PublishDeleteShadow(
return result;
}
topic = topic.replace("{thingName}", request.thingName);
String payloadJson = gson.toJson(request);
ByteBuffer payload = ByteBuffer.allocateDirect(payloadJson.length());
payload.put(payloadJson.getBytes());
MqttMessage message = new MqttMessage(topic, payload);
String payloadJson = gson.toJson(request);
MqttMessage message = new MqttMessage(topic, payloadJson.getBytes());
return connection.publish(message, qos, false);
}

Expand All @@ -235,7 +229,7 @@ public CompletableFuture<Integer> SubscribeToDeleteShadowAccepted(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
DeleteShadowResponse response = gson.fromJson(payload, DeleteShadowResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -268,7 +262,7 @@ public CompletableFuture<Integer> SubscribeToGetShadowAccepted(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
GetShadowResponse response = gson.fromJson(payload, GetShadowResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -301,7 +295,7 @@ public CompletableFuture<Integer> SubscribeToShadowUpdatedEvents(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
ShadowUpdatedEvent response = gson.fromJson(payload, ShadowUpdatedEvent.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -334,7 +328,7 @@ public CompletableFuture<Integer> SubscribeToDeleteShadowRejected(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
ErrorResponse response = gson.fromJson(payload, ErrorResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down Expand Up @@ -367,7 +361,7 @@ public CompletableFuture<Integer> SubscribeToGetShadowRejected(
topic = topic.replace("{thingName}", request.thingName);
Consumer<MqttMessage> messageHandler = (message) -> {
try {
String payload = new String(message.getPayload().array(), "UTF-8");
String payload = new String(message.getPayload(), "UTF-8");
ErrorResponse response = gson.fromJson(payload, ErrorResponse.class);
handler.accept(response);
} catch (UnsupportedEncodingException ex) {
Expand Down

0 comments on commit a475fa1

Please sign in to comment.