批量消息

This commit is contained in:
liwei 2024-07-13 14:19:35 +08:00
parent e7ea8b3dc3
commit 11eb0935ee
2 changed files with 49 additions and 2 deletions

View File

@ -0,0 +1,25 @@
package com.zerroi.rocketmq.base.batch;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@Slf4j
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("BatchTopic", "tag1");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("消费者启动");
defaultMQPushConsumer.start();
}
}

View File

@ -1,7 +1,29 @@
package com.zerroi.rocketmq.base.batch; package com.zerroi.rocketmq.base.batch;
public class BatchProducer { import org.apache.rocketmq.client.exception.MQBrokerException;
public static void main(String[] args) { import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message("BatchTopic", "tag1", ("hello world" + i).getBytes());
messageList.add(message);
}
SendResult sendResult = producer.send(messageList);
System.out.println("发送结果" + sendResult);
producer.shutdown();
} }
} }