From 2c37a2dfeeb29028f7f38bca3524279337220373 Mon Sep 17 00:00:00 2001 From: dyrnq Date: Tue, 15 Oct 2024 20:55:47 +0800 Subject: [PATCH] POC compressMsgBodyThreshold vs compressMessageBodyThreshold https://github.com/spring-cloud-alibaba-group/spring-cloud-alibaba-group.github.io/issues/289 --- .../sca/rocketmq/StreamConsumerRunner.java | 5 +- .../sca/rocketmq/StreamProducerRunner.java | 52 +++++++++++++++---- src/main/resources/application.yaml | 16 +++--- 3 files changed, 55 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/dyrnq/sca/rocketmq/StreamConsumerRunner.java b/src/main/java/com/dyrnq/sca/rocketmq/StreamConsumerRunner.java index e3aa12b..40dde51 100644 --- a/src/main/java/com/dyrnq/sca/rocketmq/StreamConsumerRunner.java +++ b/src/main/java/com/dyrnq/sca/rocketmq/StreamConsumerRunner.java @@ -45,8 +45,9 @@ public void run(ApplicationArguments args) throws Exception { // 获取 Queue ID int queueId = msg.getQueueId(); - - log.info("storeHost: {}, Broker: {} , Queue ID: {} , Message ID: {} , Received message: {}", msg.getStoreHost().toString(), brokerName, queueId, messageId, messageBody); + int sysFlag = msg.getSysFlag(); + boolean isCompressed = (sysFlag & 0x1) != 0; + log.info("isCompressed: {}, storeHost: {}, Broker: {} , Queue ID: {} , Message ID: {} , Received message: {}", isCompressed, msg.getStoreHost().toString(), brokerName, queueId, messageId, messageBody); } // return null; // 返回消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 返回消费成功状态,自动提交 diff --git a/src/main/java/com/dyrnq/sca/rocketmq/StreamProducerRunner.java b/src/main/java/com/dyrnq/sca/rocketmq/StreamProducerRunner.java index 00a234d..5e6ece8 100644 --- a/src/main/java/com/dyrnq/sca/rocketmq/StreamProducerRunner.java +++ b/src/main/java/com/dyrnq/sca/rocketmq/StreamProducerRunner.java @@ -10,8 +10,6 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; -import java.util.UUID; - @Component @AllArgsConstructor @Slf4j @@ -20,6 +18,17 @@ public class StreamProducerRunner implements ApplicationRunner { private static final String MESSAGE_TAG = "test"; private final StreamBridge streamBridge; + public static String genFixedString(int charCount) { + byte[] byteArray = new byte[charCount]; + + for (int i = 0; i < byteArray.length; i++) { + byteArray[i] = 'A'; + } + + String str = new String(byteArray); + return str; + } + @Override public void run(ApplicationArguments args) throws Exception { @@ -31,14 +40,37 @@ public void run(ApplicationArguments args) throws Exception { // streamBridge.setAsync(true); while (true) { - MessageInfo message = MessageInfo.builder().build(); - String body = UUID.randomUUID().toString(); - message.setBody(body); - message.setIndex(index); +// MessageInfo message = MessageInfo.builder().build(); +// String body = UUID.randomUUID().toString(); +// body = genFixedString(5000); +// message.setBody(body); +// message.setIndex(index); +// +// // 创建 Spring Message 对象 +// Message info = MessageBuilder.withPayload(message) +// .setHeader(MessageConst.PROPERTY_TAGS, MESSAGE_TAG) // <1> 设置 Tag +// .build(); + + + String myStr = """ + {"body":"%s",index":%d} + """; + + String outStr = String.format(myStr, "", index).trim(); + int increase = 1; + int wantBodySize = 4095; + + if (index % 7 == 0) { + wantBodySize = 4096; + } + while (outStr.getBytes().length < wantBodySize) { + outStr = String.format(myStr, genFixedString(increase), index).trim(); + increase++; + } +// log.warn("bytes length:{} ", outStr.getBytes().length); - // 创建 Spring Message 对象 - Message info = MessageBuilder.withPayload(message) - .setHeader(MessageConst.PROPERTY_TAGS, MESSAGE_TAG) // <1> 设置 Tag + Message info = MessageBuilder.withPayload(outStr) + .setHeader(MessageConst.PROPERTY_TAGS, MESSAGE_TAG) .build(); streamBridge.send(CONSUMER_EVENT_OUT_0, info); @@ -50,7 +82,7 @@ public void run(ApplicationArguments args) throws Exception { } //ThreadUtils.sleep(Duration.ofMillis(2000)); - if (index > 10 * 10000) { + if (index > 10 * 10) { break; } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 991258f..4b23ccd 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -18,20 +18,24 @@ spring: # https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ#rocketmq-provider-properties producer: sync: false + # compressMsgBodyOverHowmuch: + compressMessageBodyThreshold: 900009 + compressMsgBodyThreshold: 4096 +# sendMsgTimeout: 1 # sendMessageTimeout: 6000 -# retryTimesWhenSendFailed: 10 -# retryTimesWhenSendAsyncFailed: 10 + # retryTimesWhenSendFailed: 10 + # retryTimesWhenSendAsyncFailed: 10 bindings: consumerEvent-out-0: destination: demo - #content-type: text/plain;charset=UTF-8 - content-type: application/json + content-type: text/plain;charset=UTF-8 + # content-type: application/json group: demo-group consumerEvent-in-0: destination: demo - #content-type: text/plain;charset=UTF-8 - content-type: application/json + content-type: text/plain;charset=UTF-8 + # content-type: application/json group: demo-group consumer: concurrency: 3