Skip to content

Commit

Permalink
Add client simple test case.
Browse files Browse the repository at this point in the history
  • Loading branch information
dyrnq committed Nov 11, 2024
1 parent 9a971da commit a913457
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.dyrnq.rocketmq.simple;


import com.dyrnq.rocketmq.ClientCreater;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

/**
* Description: 消息消费者(push)
*/
public class SimplePushConsumer {


/**
* topic名称
*/
private static final String TOPIC_NAME = "topic_a";

/**
* 消费者组名称
*/
private static final String GROUP_NAME = "group";

public static void main(String[] args) throws Exception {
// 创建消息消费者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
// 订阅topic
pushConsumer.subscribe(TOPIC_NAME, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 消息处理逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
System.out.printf("Consumer Started.%n");
pushConsumer.start();
System.in.read();
pushConsumer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.dyrnq.rocketmq.simple;

import com.dyrnq.rocketmq.ClientCreater;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

public class SimpleSyncProducer {

/**
* topic名称
*/
private static final String TOPIC_NAME = "topic_a";

/**
* 生产者组名称
*/
private static final String GROUP_NAME = "group";


public static void main(String[] args) throws Exception {
// 创建消息生产者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 创建消息实例,设置topic和消息内容
Message msg = new Message(TOPIC_NAME, "yourMessageTagA", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// 发送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();
}
}

0 comments on commit a913457

Please sign in to comment.