延迟消息

This commit is contained in:
liwei 2024-07-13 14:11:15 +08:00
parent 61c0539c4c
commit e7ea8b3dc3
4 changed files with 64 additions and 3 deletions

View File

@ -0,0 +1,7 @@
package com.zerroi.rocketmq.base.batch;
public class BatchProducer {
public static void main(String[] args) {
}
}

View File

@ -7,7 +7,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class MessageConsumer { public class MessageConsumer {
public static void main(String[] args) throws MQClientException, InterruptedException { public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
@ -21,7 +21,5 @@ public class MessageConsumer {
}); });
defaultMQPushConsumer.start(); defaultMQPushConsumer.start();
// TimeUnit.SECONDS.sleep(2);
// defaultMQPushConsumer.shutdown();
} }
} }

View File

@ -0,0 +1,27 @@
package com.zerroi.rocketmq.base.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
consumer.subscribe("DelayTop", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("receive message id: {}; delay time: {}", msg.getMsgId(),
System.currentTimeMillis() - msg.getStoreTimestamp());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

View File

@ -0,0 +1,29 @@
package com.zerroi.rocketmq.base.delay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class DelayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("DelayTop", "tag1", ("hello world" + i).getBytes());
// 设置延迟等级
message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);
System.out.println("发送结果" + sendResult);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}