From b0f2b28281a1ba8c9b9b327df3db6165d7bae4e5 Mon Sep 17 00:00:00 2001 From: liwei Date: Sat, 13 Jul 2024 14:36:24 +0800 Subject: [PATCH] filter by sql --- .../com/zerroi/rocketmq/base/filter/FilterConsumer.java | 3 ++- .../com/zerroi/rocketmq/base/filter/FilterProducer.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java b/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java index 66b9ae0..deb0f2d 100644 --- a/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java +++ b/src/main/java/com/zerroi/rocketmq/base/filter/FilterConsumer.java @@ -2,6 +2,7 @@ package com.zerroi.rocketmq.base.filter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; @@ -11,7 +12,7 @@ public class FilterConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer"); defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); - defaultMQPushConsumer.subscribe("FilterTopic", "tag1 || tag2"); + defaultMQPushConsumer.subscribe("FilterSqlTopic", MessageSelector.bySql("sequenceId > 5")); defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> { messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody()))); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; diff --git a/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java b/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java index 550f0d3..766c0ba 100644 --- a/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java +++ b/src/main/java/com/zerroi/rocketmq/base/filter/FilterProducer.java @@ -15,8 +15,10 @@ public class FilterProducer { producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); producer.start(); - for (int i = 0; i < 5; i++) { - Message message = new Message("FilterTopic", "tag2", ("hello world" + i).getBytes()); + for (int i = 0; i < 10; i++) { + Message message = new Message("FilterSqlTopic", "tag2", ("hello world" + i).getBytes()); + message.putUserProperty("sequenceId", String.valueOf(i)); + producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) {