diff --git a/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java b/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java new file mode 100644 index 0000000..66b9ae0 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java @@ -0,0 +1,23 @@ +package com.zerroi.rocketmq.base.filter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; + +@Slf4j +public class FilterConsumer { + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer"); + defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + defaultMQPushConsumer.subscribe("FilterTopic", "tag1 || tag2"); + defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> { + messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody()))); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + log.info("消费者启动成功"); + defaultMQPushConsumer.start(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java b/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java new file mode 100644 index 0000000..550f0d3 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java @@ -0,0 +1,35 @@ +package com.zerroi.rocketmq.base.filter; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.concurrent.TimeUnit; + +public class FilterProducer { + public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("FilterProducer"); + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + + for (int i = 0; i < 5; i++) { + Message message = new Message("FilterTopic", "tag2", ("hello world" + i).getBytes()); + producer.send(message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + System.out.println("异步发送成功" + sendResult); + } + + @Override + public void onException(Throwable e) { + System.out.println("异步发送异常" + e); + } + }); + TimeUnit.SECONDS.sleep(1); + } + producer.shutdown(); + } +}