From 11eb0935ee185a82be8d89e0dce3c1dba07813af Mon Sep 17 00:00:00 2001 From: liwei Date: Sat, 13 Jul 2024 14:19:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E9=87=8F=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocketmq/base/batch/BatchConsumer.java | 25 ++++++++++++++++++ .../rocketmq/base/batch/BatchProducer.java | 26 +++++++++++++++++-- 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/zerroi/rocketmq/base/batch/BatchConsumer.java diff --git a/src/main/java/com/zerroi/rocketmq/base/batch/BatchConsumer.java b/src/main/java/com/zerroi/rocketmq/base/batch/BatchConsumer.java new file mode 100644 index 0000000..b7180b2 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/batch/BatchConsumer.java @@ -0,0 +1,25 @@ +package com.zerroi.rocketmq.base.batch; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; + +@Slf4j +public class BatchConsumer { + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); + defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + + defaultMQPushConsumer.subscribe("BatchTopic", "tag1"); + + defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> { + messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody()))); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + log.info("消费者启动"); + defaultMQPushConsumer.start(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java b/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java index 2c60a3c..16ffb19 100644 --- a/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java +++ b/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java @@ -1,7 +1,29 @@ package com.zerroi.rocketmq.base.batch; -public class BatchProducer { - public static void main(String[] args) { +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import java.util.ArrayList; +import java.util.List; + +public class BatchProducer { + public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("group1"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + List messageList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + Message message = new Message("BatchTopic", "tag1", ("hello world" + i).getBytes()); + messageList.add(message); + } + SendResult sendResult = producer.send(messageList); + + System.out.println("发送结果" + sendResult); + producer.shutdown(); } }