Compare commits

..

6 Commits

Author SHA1 Message Date
liwei 38a400b7c9 transaction 2024-07-13 15:06:10 +08:00
liwei b0f2b28281 filter by sql 2024-07-13 14:36:24 +08:00
liwei f6573b355f tag filter 2024-07-13 14:30:40 +08:00
liwei 11eb0935ee 批量消息 2024-07-13 14:19:35 +08:00
liwei e7ea8b3dc3 延迟消息 2024-07-13 14:11:15 +08:00
liwei 61c0539c4c logback配置,订单消息顺序消费 2024-07-13 13:49:51 +08:00
14 changed files with 406 additions and 39 deletions

10
pom.xml
View File

@ -24,6 +24,16 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.32</version> <version>1.18.32</version>
</dependency> </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.5.6</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,25 @@
package com.zerroi.rocketmq.base.batch;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@Slf4j
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("BatchTopic", "tag1");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("消费者启动");
defaultMQPushConsumer.start();
}
}

View File

@ -0,0 +1,29 @@
package com.zerroi.rocketmq.base.batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message("BatchTopic", "tag1", ("hello world" + i).getBytes());
messageList.add(message);
}
SendResult sendResult = producer.send(messageList);
System.out.println("发送结果" + sendResult);
producer.shutdown();
}
}

View File

@ -7,8 +7,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class MessageConsumer { public class MessageConsumer {
public static void main(String[] args) throws MQClientException, InterruptedException { public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("TransactionProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("base", "tag1"); defaultMQPushConsumer.subscribe("base", "tag1");
@ -21,7 +21,5 @@ public class MessageConsumer {
}); });
defaultMQPushConsumer.start(); defaultMQPushConsumer.start();
// TimeUnit.SECONDS.sleep(2);
// defaultMQPushConsumer.shutdown();
} }
} }

View File

@ -1,35 +0,0 @@
package com.zerroi.rocketmq.base.consumer;
import com.zerroi.rocketmq.base.order.OrderStep;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<OrderStep> list = OrderStep.buildOrders();
list.forEach(orderStep -> {
String body = orderStep + "";
Message msg = new Message("OrderTopic", "OrderTag", "i" + orderStep.getOrderId(), body.getBytes());
try {
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
}, orderStep.getOrderId());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -0,0 +1,27 @@
package com.zerroi.rocketmq.base.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class DelayConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
consumer.subscribe("DelayTop", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
log.info("receive message id: {}; delay time: {}", msg.getMsgId(),
System.currentTimeMillis() - msg.getStoreTimestamp());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}

View File

@ -0,0 +1,29 @@
package com.zerroi.rocketmq.base.delay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class DelayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("DelayTop", "tag1", ("hello world" + i).getBytes());
// 设置延迟等级
message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);
System.out.println("发送结果" + sendResult);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -0,0 +1,24 @@
package com.zerroi.rocketmq.base.filter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
@Slf4j
public class FilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("FilterSqlTopic", MessageSelector.bySql("sequenceId > 5"));
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("消费者启动成功");
defaultMQPushConsumer.start();
}
}

View File

@ -0,0 +1,37 @@
package com.zerroi.rocketmq.base.filter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class FilterProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("FilterProducer");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("FilterSqlTopic", "tag2", ("hello world" + i).getBytes());
message.putUserProperty("sequenceId", String.valueOf(i));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送成功" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("异步发送异常" + e);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}

View File

@ -0,0 +1,30 @@
package com.zerroi.rocketmq.base.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
// 订阅主题与TAG
consumer.subscribe("OrderTopic", "OrderTag");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
String message = new String(msg.getBody());
log.info("Receive message: {}", message);
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
log.info("OrderConsumer Started");
}
}

View File

@ -0,0 +1,48 @@
package com.zerroi.rocketmq.base.order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
@Slf4j
public class OrderProducer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
producer.start();
List<OrderStep> list = OrderStep.buildOrders();
log.info("需要发送的消息集合大小:{}", list.size());
for (int i = 0; i < list.size(); i++) {
String body = list.get(i) + "";
Message msg = new Message("OrderTopic", "OrderTag", "i" + i, body.getBytes());
try {
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
* 选择消息队列根据业务标识参数向指定的消息队列发送消息
* @param mqs 消息队列集合
* @param msg 消息对象
* @param arg 业务标识参数
* @return 选择的消息队列
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long orderId = (long) arg;
long mqIndex = orderId % mqs.size();
return mqs.get((int) mqIndex);
}
}, list.get(i).getOrderId());
System.out.println(sendResult);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
producer.shutdown();
}
}

View File

@ -0,0 +1,26 @@
package com.zerroi.rocketmq.base.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@Slf4j
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("TransactionTopic", "*");
// 广播模式
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> log.info(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("Consumer Started.");
defaultMQPushConsumer.start();
}
}

View File

@ -0,0 +1,57 @@
package com.zerroi.rocketmq.base.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer transactionProducer = new TransactionMQProducer("TransactionProducer");
transactionProducer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
// 添加事务监听器
transactionProducer.setTransactionListener(new TransactionListener() {
/**
* 该方法执行本地事务
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return LocalTransactionState
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 该方法进行事务状态回查
* @param msg Check message
* @return LocalTransactionState
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("messageTag:{}", msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionProducer.start();
String[] tags = {"TagA", "TagB", "TagC"};
// 消费者最后消息两条消息
for (int i = 0; i < 3; i++) {
Message msg = new Message("TransactionTopic", tags[i], ("hello world" + i).getBytes());
transactionProducer.sendMessageInTransaction(msg, null);
}
transactionProducer.start();
}
}

View File

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
<property name="APP_NAME" value="logtest"/>
<property name="LOG_HOME" value="${log.dir:-logs}/${APP_NAME}"/>
<property name="ENCODER_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{80} - %msg%n"/>
<contextName>${APP_NAME}</contextName>
<!-- Console Log -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>${ENCODER_PATTERN}</Pattern>
</encoder>
</appender>
<!-- File Log -->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/output.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>
<!-- Error Log -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>
<!-- Sync Log -->
<appender name="SYNC_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/sync.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${ENCODER_PATTERN}</pattern>
</encoder>
</appender>
<logger name="log.sync" level="DEBUG" additivity="true">
<appender-ref ref="SYNC_FILE"/>
</logger>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</configuration>