Compare commits

..

No commits in common. "master" and "main" have entirely different histories.
master ... main

19 changed files with 28 additions and 598 deletions

26
.gitignore vendored Normal file
View File

@ -0,0 +1,26 @@
# ---> Java
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
replay_pid*

2
README.md Normal file
View File

@ -0,0 +1,2 @@
# rocketmq-study

39
pom.xml
View File

@ -1,39 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zerroi.rocketmq</groupId>
<artifactId>rocketmq-study</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.32</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.5.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
</project>

View File

@ -1,25 +0,0 @@
package com.zerroi.rocketmq.base.batch;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@Slf4j
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("BatchTopic", "tag1");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("消费者启动");
defaultMQPushConsumer.start();
}
}

View File

@ -1,29 +0,0 @@
package com.zerroi.rocketmq.base.batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message("BatchTopic", "tag1", ("hello world" + i).getBytes());
messageList.add(message);
}
SendResult sendResult = producer.send(messageList);
System.out.println("发送结果" + sendResult);
producer.shutdown();
}
}

View File

@ -1,25 +0,0 @@
package com.zerroi.rocketmq.base.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class MessageConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("TransactionProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("base", "tag1");
// 广播模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
}
}

View File

@ -1,27 +0,0 @@
package com.zerroi.rocketmq.base.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
consumer.subscribe("DelayTop", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("receive message id: {}; delay time: {}", msg.getMsgId(),
System.currentTimeMillis() - msg.getStoreTimestamp());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

View File

@ -1,29 +0,0 @@
package com.zerroi.rocketmq.base.delay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class DelayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("DelayTop", "tag1", ("hello world" + i).getBytes());
// 设置延迟等级
message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);
System.out.println("发送结果" + sendResult);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -1,24 +0,0 @@
package com.zerroi.rocketmq.base.filter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@Slf4j
public class FilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("FilterSqlTopic", MessageSelector.bySql("sequenceId > 5"));
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("消费者启动成功");
defaultMQPushConsumer.start();
}
}

View File

@ -1,37 +0,0 @@
package com.zerroi.rocketmq.base.filter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class FilterProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("FilterProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("FilterSqlTopic", "tag2", ("hello world" + i).getBytes());
message.putUserProperty("sequenceId", String.valueOf(i));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步发送异常" + e);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -1,30 +0,0 @@
package com.zerroi.rocketmq.base.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
// 订阅主题与TAG
consumer.subscribe("OrderTopic", "OrderTag");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
String message = new String(msg.getBody());
log.info("Receive message: {}", message);
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
log.info("OrderConsumer Started");
}
}

View File

@ -1,48 +0,0 @@
package com.zerroi.rocketmq.base.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
@Slf4j
public class OrderProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<OrderStep> list = OrderStep.buildOrders();
log.info("需要发送的消息集合大小:{}", list.size());
for (int i = 0; i < list.size(); i++) {
String body = list.get(i) + "";
Message msg = new Message("OrderTopic", "OrderTag", "i" + i, body.getBytes());
try {
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
* 选择消息队列根据业务标识参数向指定的消息队列发送消息
* @param mqs 消息队列集合
* @param msg 消息对象
* @param arg 业务标识参数
* @return 选择的消息队列
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long mqIndex = orderId % mqs.size();
return mqs.get((int) mqIndex);
}
}, list.get(i).getOrderId());
System.out.println(sendResult);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
producer.shutdown();
}
}

View File

@ -1,43 +0,0 @@
package com.zerroi.rocketmq.base.order;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.ArrayList;
import java.util.List;
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class OrderStep {
private long orderId;
private String desc;
public static List<OrderStep> buildOrders() {
List<OrderStep> list = new ArrayList<>();
OrderStep orderStep = new OrderStep();
// id: 1039 创建 付款 推送 完成
// id: 1065: 创建 付款
// id: 7236 创建 付款
// Order 1039
list.add(createOrderStep(1039, "创建"));
list.add(createOrderStep(1065, "创建"));
list.add(createOrderStep(1039, "付款"));
list.add(createOrderStep(7236, "创建"));
list.add(createOrderStep(1065, "付款"));
list.add(createOrderStep(7236, "付款"));
list.add(createOrderStep(1039, "推送"));
list.add(createOrderStep(1039, "完成"));
return list;
}
private static OrderStep createOrderStep(long orderId, String desc) {
OrderStep orderStep = new OrderStep();
orderStep.setOrderId(orderId);
orderStep.setDesc(desc);
return orderStep;
}
}

View File

@ -1,38 +0,0 @@
package com.zerroi.rocketmq.base.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步发送异常" + e);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -1,25 +0,0 @@
package com.zerroi.rocketmq.base.producer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 4; i++) {
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
producer.sendOneway(message);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -1,34 +0,0 @@
package com.zerroi.rocketmq.base.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
SendResult result = producer.send(message);
SendStatus sendStatus = result.getSendStatus();
String msgId = result.getMsgId();
int queueId = result.getMessageQueue().getQueueId();
System.out.println("发送状态" + result + " " + sendStatus + " " + msgId + " " + queueId);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -1,26 +0,0 @@
package com.zerroi.rocketmq.base.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@Slf4j
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("TransactionTopic", "*");
// 广播模式
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> log.info(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("Consumer Started.");
defaultMQPushConsumer.start();
}
}

View File

@ -1,57 +0,0 @@
package com.zerroi.rocketmq.base.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer transactionProducer = new TransactionMQProducer("TransactionProducer");
transactionProducer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
// 添加事务监听器
transactionProducer.setTransactionListener(new TransactionListener() {
/**
* 该方法执行本地事务
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return LocalTransactionState
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 该方法进行事务状态回查
* @param msg Check message
* @return LocalTransactionState
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("messageTag:{}", msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionProducer.start();
String[] tags = {"TagA", "TagB", "TagC"};
// 消费者最后消息两条消息
for (int i = 0; i < 3; i++) {
Message msg = new Message("TransactionTopic", tags[i], ("hello world" + i).getBytes());
transactionProducer.sendMessageInTransaction(msg, null);
}
transactionProducer.start();
}
}

View File

@ -1,62 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
<property name="APP_NAME" value="logtest"/>
<property name="LOG_HOME" value="${log.dir:-logs}/${APP_NAME}"/>
<property name="ENCODER_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{80} - %msg%n"/>
<contextName>${APP_NAME}</contextName>
<!-- Console Log -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>${ENCODER_PATTERN}</Pattern>
</encoder>
</appender>
<!-- File Log -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/output.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>
<!-- Error Log -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>
<!-- Sync Log -->
<appender name="SYNC_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/sync.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>
<logger name="log.sync" level="DEBUG" additivity="true">
<appender-ref ref="SYNC_FILE"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</configuration>