Skip to content

Commit

Permalink
POC compressMsgBodyThreshold vs compressMessageBodyThreshold
Browse files Browse the repository at this point in the history
  • Loading branch information
dyrnq committed Oct 15, 2024
1 parent e6cca74 commit 2c37a2d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ public void run(ApplicationArguments args) throws Exception {
// 获取 Queue ID
int queueId = msg.getQueueId();


log.info("storeHost: {}, Broker: {} , Queue ID: {} , Message ID: {} , Received message: {}", msg.getStoreHost().toString(), brokerName, queueId, messageId, messageBody);
int sysFlag = msg.getSysFlag();
boolean isCompressed = (sysFlag & 0x1) != 0;
log.info("isCompressed: {}, storeHost: {}, Broker: {} , Queue ID: {} , Message ID: {} , Received message: {}", isCompressed, msg.getStoreHost().toString(), brokerName, queueId, messageId, messageBody);
}
// return null; // 返回消费状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 返回消费成功状态,自动提交
Expand Down
52 changes: 42 additions & 10 deletions src/main/java/com/dyrnq/sca/rocketmq/StreamProducerRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
@AllArgsConstructor
@Slf4j
Expand All @@ -20,6 +18,17 @@ public class StreamProducerRunner implements ApplicationRunner {
private static final String MESSAGE_TAG = "test";
private final StreamBridge streamBridge;

public static String genFixedString(int charCount) {
byte[] byteArray = new byte[charCount];

for (int i = 0; i < byteArray.length; i++) {
byteArray[i] = 'A';
}

String str = new String(byteArray);
return str;
}

@Override
public void run(ApplicationArguments args) throws Exception {

Expand All @@ -31,14 +40,37 @@ public void run(ApplicationArguments args) throws Exception {
// streamBridge.setAsync(true);
while (true) {

MessageInfo message = MessageInfo.builder().build();
String body = UUID.randomUUID().toString();
message.setBody(body);
message.setIndex(index);
// MessageInfo message = MessageInfo.builder().build();
// String body = UUID.randomUUID().toString();
// body = genFixedString(5000);
// message.setBody(body);
// message.setIndex(index);
//
// // 创建 Spring Message 对象
// Message<MessageInfo> info = MessageBuilder.withPayload(message)
// .setHeader(MessageConst.PROPERTY_TAGS, MESSAGE_TAG) // <1> 设置 Tag
// .build();


String myStr = """
{"body":"%s",index":%d}
""";

String outStr = String.format(myStr, "", index).trim();
int increase = 1;
int wantBodySize = 4095;

if (index % 7 == 0) {
wantBodySize = 4096;
}
while (outStr.getBytes().length < wantBodySize) {
outStr = String.format(myStr, genFixedString(increase), index).trim();
increase++;
}
// log.warn("bytes length:{} ", outStr.getBytes().length);

// 创建 Spring Message 对象
Message<MessageInfo> info = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, MESSAGE_TAG) // <1> 设置 Tag
Message<String> info = MessageBuilder.withPayload(outStr)
.setHeader(MessageConst.PROPERTY_TAGS, MESSAGE_TAG)
.build();
streamBridge.send(CONSUMER_EVENT_OUT_0, info);

Expand All @@ -50,7 +82,7 @@ public void run(ApplicationArguments args) throws Exception {
}
//ThreadUtils.sleep(Duration.ofMillis(2000));

if (index > 10 * 10000) {
if (index > 10 * 10) {
break;
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ spring:
# https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ#rocketmq-provider-properties
producer:
sync: false
# compressMsgBodyOverHowmuch:
compressMessageBodyThreshold: 900009
compressMsgBodyThreshold: 4096
# sendMsgTimeout: 1
# sendMessageTimeout: 6000
# retryTimesWhenSendFailed: 10
# retryTimesWhenSendAsyncFailed: 10
# retryTimesWhenSendFailed: 10
# retryTimesWhenSendAsyncFailed: 10
bindings:
consumerEvent-out-0:
destination: demo
#content-type: text/plain;charset=UTF-8
content-type: application/json
content-type: text/plain;charset=UTF-8
# content-type: application/json
group: demo-group

consumerEvent-in-0:
destination: demo
#content-type: text/plain;charset=UTF-8
content-type: application/json
content-type: text/plain;charset=UTF-8
# content-type: application/json
group: demo-group
consumer:
concurrency: 3
Expand Down

0 comments on commit 2c37a2d

Please sign in to comment.