commit 8b8237d9ab61fa1d580c06eb930cf8472b04e3dd Author: liwei Date: Sat Jul 13 13:22:07 2024 +0800 rocket mq init diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..26b4b88 --- /dev/null +++ b/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + com.zerroi.rocketmq + rocketmq-study + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + org.apache.rocketmq + rocketmq-client + 5.2.0 + + + org.projectlombok + lombok + 1.18.32 + + + + \ No newline at end of file diff --git a/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java b/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java new file mode 100644 index 0000000..c424d80 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java @@ -0,0 +1,27 @@ +package com.zerroi.rocketmq.base.consumer; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; + +public class MessageConsumer { + public static void main(String[] args) throws MQClientException, InterruptedException { + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); + defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + + defaultMQPushConsumer.subscribe("base", "tag1"); + // 广播模式 + defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING); + + defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> { + messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody()))); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + defaultMQPushConsumer.start(); + // TimeUnit.SECONDS.sleep(2); + // defaultMQPushConsumer.shutdown(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/consumer/OrderProducer.java b/src/main/java/com/zerroi/rocketmq/base/consumer/OrderProducer.java new file mode 100644 index 0000000..4bc154f --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/consumer/OrderProducer.java @@ -0,0 +1,35 @@ +package com.zerroi.rocketmq.base.consumer; + +import com.zerroi.rocketmq.base.order.OrderStep; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + +public class OrderProducer { + public static void main(String[] args) throws MQClientException { + DefaultMQProducer producer = new DefaultMQProducer("group1"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + + List list = OrderStep.buildOrders(); + list.forEach(orderStep -> { + String body = orderStep + ""; + Message msg = new Message("OrderTopic", "OrderTag", "i" + orderStep.getOrderId(), body.getBytes()); + try { + producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + return null; + } + }, orderStep.getOrderId()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/order/OrderStep.java b/src/main/java/com/zerroi/rocketmq/base/order/OrderStep.java new file mode 100644 index 0000000..a818e5a --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/order/OrderStep.java @@ -0,0 +1,43 @@ +package com.zerroi.rocketmq.base.order; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import java.util.ArrayList; +import java.util.List; + +@Data +@ToString +@AllArgsConstructor +@NoArgsConstructor +public class OrderStep { + private long orderId; + private String desc; + + public static List buildOrders() { + List list = new ArrayList<>(); + OrderStep orderStep = new OrderStep(); + // id: 1039: 创建 付款 推送 完成 + // id: 1065: 创建 付款 + // id: 7236 : 创建 付款 + // Order 1039 + list.add(createOrderStep(1039, "创建")); + list.add(createOrderStep(1065, "创建")); + list.add(createOrderStep(1039, "付款")); + list.add(createOrderStep(7236, "创建")); + list.add(createOrderStep(1065, "付款")); + list.add(createOrderStep(7236, "付款")); + list.add(createOrderStep(1039, "推送")); + list.add(createOrderStep(1039, "完成")); + + return list; + } + private static OrderStep createOrderStep(long orderId, String desc) { + OrderStep orderStep = new OrderStep(); + orderStep.setOrderId(orderId); + orderStep.setDesc(desc); + return orderStep; + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/producer/AsyncProducer.java b/src/main/java/com/zerroi/rocketmq/base/producer/AsyncProducer.java new file mode 100644 index 0000000..5dd2c2e --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/producer/AsyncProducer.java @@ -0,0 +1,38 @@ +package com.zerroi.rocketmq.base.producer; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.concurrent.TimeUnit; + +public class AsyncProducer { + public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + + for (int i = 0; i < 5; i++) { + Message message = new Message("base", "tag1", ("hello world" + i).getBytes()); + producer.send(message, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + System.out.println("异步发送成功" + sendResult); + } + + @Override + public void onException(Throwable e) { + System.out.println("异步发送异常" + e); + } + }); + + TimeUnit.SECONDS.sleep(1); + } + + producer.shutdown(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/producer/OneWayProducer.java b/src/main/java/com/zerroi/rocketmq/base/producer/OneWayProducer.java new file mode 100644 index 0000000..1803d44 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/producer/OneWayProducer.java @@ -0,0 +1,25 @@ +package com.zerroi.rocketmq.base.producer; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.concurrent.TimeUnit; + +public class OneWayProducer { + public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + + for (int i = 0; i < 4; i++) { + Message message = new Message("base", "tag1", ("hello world" + i).getBytes()); + producer.sendOneway(message); + + TimeUnit.SECONDS.sleep(1); + } + producer.shutdown(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/producer/SyncProducer.java b/src/main/java/com/zerroi/rocketmq/base/producer/SyncProducer.java new file mode 100644 index 0000000..068f9ac --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/producer/SyncProducer.java @@ -0,0 +1,34 @@ +package com.zerroi.rocketmq.base.producer; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import java.util.concurrent.TimeUnit; + + +public class SyncProducer { + public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException { + DefaultMQProducer producer = new DefaultMQProducer("group1"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + + for (int i = 0; i < 5; i++) { + Message message = new Message("base", "tag1", ("hello world" + i).getBytes()); + SendResult result = producer.send(message); + SendStatus sendStatus = result.getSendStatus(); + String msgId = result.getMsgId(); + int queueId = result.getMessageQueue().getQueueId(); + + + System.out.println("发送状态" + result + " " + sendStatus + " " + msgId + " " + queueId); + TimeUnit.SECONDS.sleep(1); + } + producer.shutdown(); + } +}