tag filter

This commit is contained in:
liwei 2024-07-13 14:30:40 +08:00
parent 11eb0935ee
commit f6573b355f
2 changed files with 58 additions and 0 deletions

View File

@ -0,0 +1,23 @@
package com.zerroi.rocketmq.base.filter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@Slf4j
public class FilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("FilterTopic", "tag1 || tag2");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("消费者启动成功");
defaultMQPushConsumer.start();
}
}

View File

@ -0,0 +1,35 @@
package com.zerroi.rocketmq.base.filter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class FilterProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("FilterProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("FilterTopic", "tag2", ("hello world" + i).getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步发送异常" + e);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}