filter by sql

This commit is contained in:
liwei 2024-07-13 14:36:24 +08:00
parent f6573b355f
commit b0f2b28281
2 changed files with 6 additions and 3 deletions

View File

@ -2,6 +2,7 @@ package com.zerroi.rocketmq.base.filter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@ -11,7 +12,7 @@ public class FilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("FilterTopic", "tag1 || tag2");
defaultMQPushConsumer.subscribe("FilterSqlTopic", MessageSelector.bySql("sequenceId > 5"));
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

View File

@ -15,8 +15,10 @@ public class FilterProducer {
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("FilterTopic", "tag2", ("hello world" + i).getBytes());
for (int i = 0; i < 10; i++) {
Message message = new Message("FilterSqlTopic", "tag2", ("hello world" + i).getBytes());
message.putUserProperty("sequenceId", String.valueOf(i));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {