diff --git a/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java b/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java new file mode 100644 index 0000000..2c60a3c --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/batch/BatchProducer.java @@ -0,0 +1,7 @@ +package com.zerroi.rocketmq.base.batch; + +public class BatchProducer { + public static void main(String[] args) { + + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java b/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java index c424d80..703a64c 100644 --- a/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java +++ b/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java @@ -7,7 +7,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; public class MessageConsumer { - public static void main(String[] args) throws MQClientException, InterruptedException { + public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); @@ -21,7 +21,5 @@ public class MessageConsumer { }); defaultMQPushConsumer.start(); - // TimeUnit.SECONDS.sleep(2); - // defaultMQPushConsumer.shutdown(); } } diff --git a/src/main/java/com/zerroi/rocketmq/base/delay/DelayConsumer.java b/src/main/java/com/zerroi/rocketmq/base/delay/DelayConsumer.java new file mode 100644 index 0000000..cf19955 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/delay/DelayConsumer.java @@ -0,0 +1,27 @@ +package com.zerroi.rocketmq.base.delay; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +@Slf4j +public class DelayConsumer { + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); + consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + consumer.subscribe("DelayTop", "*"); + + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + log.info("receive message id: {}; delay time: {}", msg.getMsgId(), + System.currentTimeMillis() - msg.getStoreTimestamp()); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + consumer.start(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/delay/DelayProducer.java b/src/main/java/com/zerroi/rocketmq/base/delay/DelayProducer.java new file mode 100644 index 0000000..9672081 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/delay/DelayProducer.java @@ -0,0 +1,29 @@ +package com.zerroi.rocketmq.base.delay; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.concurrent.TimeUnit; + +public class DelayProducer { + public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + DefaultMQProducer producer = new DefaultMQProducer("group1"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + + for (int i = 0; i < 10; i++) { + Message message = new Message("DelayTop", "tag1", ("hello world" + i).getBytes()); + // 设置延迟等级 + message.setDelayTimeLevel(2); + SendResult sendResult = producer.send(message); + System.out.println("发送结果" + sendResult); + TimeUnit.SECONDS.sleep(1); + } + producer.shutdown(); + } +}