rocket mq init

This commit is contained in:
liwei 2024-07-13 13:22:07 +08:00
commit 8b8237d9ab
7 changed files with 231 additions and 0 deletions

29
pom.xml Normal file
View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zerroi.rocketmq</groupId>
<artifactId>rocketmq-study</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,27 @@
package com.zerroi.rocketmq.base.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class MessageConsumer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("base", "tag1");
// 广播模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
// TimeUnit.SECONDS.sleep(2);
// defaultMQPushConsumer.shutdown();
}
}

View File

@ -0,0 +1,35 @@
package com.zerroi.rocketmq.base.consumer;
import com.zerroi.rocketmq.base.order.OrderStep;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<OrderStep> list = OrderStep.buildOrders();
list.forEach(orderStep -> {
String body = orderStep + "";
Message msg = new Message("OrderTopic", "OrderTag", "i" + orderStep.getOrderId(), body.getBytes());
try {
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
}, orderStep.getOrderId());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -0,0 +1,43 @@
package com.zerroi.rocketmq.base.order;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.ArrayList;
import java.util.List;
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class OrderStep {
private long orderId;
private String desc;
public static List<OrderStep> buildOrders() {
List<OrderStep> list = new ArrayList<>();
OrderStep orderStep = new OrderStep();
// id: 1039 创建 付款 推送 完成
// id: 1065: 创建 付款
// id: 7236 创建 付款
// Order 1039
list.add(createOrderStep(1039, "创建"));
list.add(createOrderStep(1065, "创建"));
list.add(createOrderStep(1039, "付款"));
list.add(createOrderStep(7236, "创建"));
list.add(createOrderStep(1065, "付款"));
list.add(createOrderStep(7236, "付款"));
list.add(createOrderStep(1039, "推送"));
list.add(createOrderStep(1039, "完成"));
return list;
}
private static OrderStep createOrderStep(long orderId, String desc) {
OrderStep orderStep = new OrderStep();
orderStep.setOrderId(orderId);
orderStep.setDesc(desc);
return orderStep;
}
}

View File

@ -0,0 +1,38 @@
package com.zerroi.rocketmq.base.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步发送异常" + e);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -0,0 +1,25 @@
package com.zerroi.rocketmq.base.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 4; i++) {
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
producer.sendOneway(message);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -0,0 +1,34 @@
package com.zerroi.rocketmq.base.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
SendResult result = producer.send(message);
SendStatus sendStatus = result.getSendStatus();
String msgId = result.getMsgId();
int queueId = result.getMessageQueue().getQueueId();
System.out.println("发送状态" + result + " " + sendStatus + " " + msgId + " " + queueId);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}