Compare commits
No commits in common. "main" and "dev-first" have entirely different histories.
|
@ -1,26 +0,0 @@
|
||||||
# ---> Java
|
|
||||||
# Compiled class file
|
|
||||||
*.class
|
|
||||||
|
|
||||||
# Log file
|
|
||||||
*.log
|
|
||||||
|
|
||||||
# BlueJ files
|
|
||||||
*.ctxt
|
|
||||||
|
|
||||||
# Mobile Tools for Java (J2ME)
|
|
||||||
.mtj.tmp/
|
|
||||||
|
|
||||||
# Package Files #
|
|
||||||
*.jar
|
|
||||||
*.war
|
|
||||||
*.nar
|
|
||||||
*.ear
|
|
||||||
*.zip
|
|
||||||
*.tar.gz
|
|
||||||
*.rar
|
|
||||||
|
|
||||||
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
|
|
||||||
hs_err_pid*
|
|
||||||
replay_pid*
|
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<groupId>com.zerroi.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-study</artifactId>
|
||||||
|
<version>1.0-SNAPSHOT</version>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<maven.compiler.source>17</maven.compiler.source>
|
||||||
|
<maven.compiler.target>17</maven.compiler.target>
|
||||||
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
</properties>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.rocketmq</groupId>
|
||||||
|
<artifactId>rocketmq-client</artifactId>
|
||||||
|
<version>5.2.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>1.18.32</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-core</artifactId>
|
||||||
|
<version>1.5.6</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
<version>1.5.6</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.zerroi.rocketmq.base.batch;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class BatchConsumer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
|
||||||
|
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
|
||||||
|
defaultMQPushConsumer.subscribe("BatchTopic", "tag1");
|
||||||
|
|
||||||
|
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
|
||||||
|
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
|
||||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("消费者启动");
|
||||||
|
defaultMQPushConsumer.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package com.zerroi.rocketmq.base.batch;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class BatchProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("group1");
|
||||||
|
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
List<Message> messageList = new ArrayList<>();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Message message = new Message("BatchTopic", "tag1", ("hello world" + i).getBytes());
|
||||||
|
messageList.add(message);
|
||||||
|
}
|
||||||
|
SendResult sendResult = producer.send(messageList);
|
||||||
|
|
||||||
|
System.out.println("发送结果" + sendResult);
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.zerroi.rocketmq.base.consumer;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
|
|
||||||
|
public class MessageConsumer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("TransactionProducer");
|
||||||
|
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
|
||||||
|
defaultMQPushConsumer.subscribe("base", "tag1");
|
||||||
|
// 广播模式
|
||||||
|
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
|
||||||
|
|
||||||
|
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
|
||||||
|
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
|
||||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
defaultMQPushConsumer.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package com.zerroi.rocketmq.base.delay;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class DelayConsumer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
|
||||||
|
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
consumer.subscribe("DelayTop", "*");
|
||||||
|
|
||||||
|
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
|
||||||
|
for (MessageExt msg : msgs) {
|
||||||
|
log.info("receive message id: {}; delay time: {}", msg.getMsgId(),
|
||||||
|
System.currentTimeMillis() - msg.getStoreTimestamp());
|
||||||
|
}
|
||||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
consumer.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package com.zerroi.rocketmq.base.delay;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class DelayProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("group1");
|
||||||
|
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Message message = new Message("DelayTop", "tag1", ("hello world" + i).getBytes());
|
||||||
|
// 设置延迟等级
|
||||||
|
message.setDelayTimeLevel(2);
|
||||||
|
SendResult sendResult = producer.send(message);
|
||||||
|
System.out.println("发送结果" + sendResult);
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
package com.zerroi.rocketmq.base.filter;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.MessageSelector;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class FilterConsumer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("FilterProducer");
|
||||||
|
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
defaultMQPushConsumer.subscribe("FilterSqlTopic", MessageSelector.bySql("sequenceId > 5"));
|
||||||
|
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
|
||||||
|
messageExtList.forEach(messageExt -> System.out.println(new String(messageExt.getBody())));
|
||||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("消费者启动成功");
|
||||||
|
defaultMQPushConsumer.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.zerroi.rocketmq.base.filter;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.SendCallback;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class FilterProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("FilterProducer");
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
Message message = new Message("FilterSqlTopic", "tag2", ("hello world" + i).getBytes());
|
||||||
|
message.putUserProperty("sequenceId", String.valueOf(i));
|
||||||
|
|
||||||
|
producer.send(message, new SendCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult sendResult) {
|
||||||
|
System.out.println("异步发送成功" + sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Throwable e) {
|
||||||
|
System.out.println("异步发送异常" + e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package com.zerroi.rocketmq.base.order;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class OrderConsumer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
|
||||||
|
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
// 订阅主题与TAG
|
||||||
|
consumer.subscribe("OrderTopic", "OrderTag");
|
||||||
|
|
||||||
|
// 注册消息监听器
|
||||||
|
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||||
|
for (MessageExt msg : msgs) {
|
||||||
|
String message = new String(msg.getBody());
|
||||||
|
log.info("Receive message: {}", message);
|
||||||
|
}
|
||||||
|
return ConsumeOrderlyStatus.SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
consumer.start();
|
||||||
|
log.info("OrderConsumer Started");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
package com.zerroi.rocketmq.base.order;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.common.message.MessageQueue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class OrderProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("group1");
|
||||||
|
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
List<OrderStep> list = OrderStep.buildOrders();
|
||||||
|
log.info("需要发送的消息集合大小:{}", list.size());
|
||||||
|
for (int i = 0; i < list.size(); i++) {
|
||||||
|
String body = list.get(i) + "";
|
||||||
|
Message msg = new Message("OrderTopic", "OrderTag", "i" + i, body.getBytes());
|
||||||
|
try {
|
||||||
|
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
|
||||||
|
/**
|
||||||
|
* 选择消息队列,根据业务标识参数向指定的消息队列发送消息
|
||||||
|
* @param mqs 消息队列集合
|
||||||
|
* @param msg 消息对象
|
||||||
|
* @param arg 业务标识参数
|
||||||
|
* @return 选择的消息队列
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
||||||
|
long orderId = (long) arg;
|
||||||
|
long mqIndex = orderId % mqs.size();
|
||||||
|
return mqs.get((int) mqIndex);
|
||||||
|
}
|
||||||
|
}, list.get(i).getOrderId());
|
||||||
|
System.out.println(sendResult);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
package com.zerroi.rocketmq.base.order;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import lombok.ToString;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@ToString
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class OrderStep {
|
||||||
|
private long orderId;
|
||||||
|
private String desc;
|
||||||
|
|
||||||
|
public static List<OrderStep> buildOrders() {
|
||||||
|
List<OrderStep> list = new ArrayList<>();
|
||||||
|
OrderStep orderStep = new OrderStep();
|
||||||
|
// id: 1039: 创建 付款 推送 完成
|
||||||
|
// id: 1065: 创建 付款
|
||||||
|
// id: 7236 : 创建 付款
|
||||||
|
// Order 1039
|
||||||
|
list.add(createOrderStep(1039, "创建"));
|
||||||
|
list.add(createOrderStep(1065, "创建"));
|
||||||
|
list.add(createOrderStep(1039, "付款"));
|
||||||
|
list.add(createOrderStep(7236, "创建"));
|
||||||
|
list.add(createOrderStep(1065, "付款"));
|
||||||
|
list.add(createOrderStep(7236, "付款"));
|
||||||
|
list.add(createOrderStep(1039, "推送"));
|
||||||
|
list.add(createOrderStep(1039, "完成"));
|
||||||
|
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
private static OrderStep createOrderStep(long orderId, String desc) {
|
||||||
|
OrderStep orderStep = new OrderStep();
|
||||||
|
orderStep.setOrderId(orderId);
|
||||||
|
orderStep.setDesc(desc);
|
||||||
|
return orderStep;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package com.zerroi.rocketmq.base.producer;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.SendCallback;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class AsyncProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
|
||||||
|
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
|
||||||
|
producer.send(message, new SendCallback() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(SendResult sendResult) {
|
||||||
|
System.out.println("异步发送成功" + sendResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Throwable e) {
|
||||||
|
System.out.println("异步发送异常" + e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
package com.zerroi.rocketmq.base.producer;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class OneWayProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("OneWayProducer");
|
||||||
|
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
|
||||||
|
producer.sendOneway(message);
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
package com.zerroi.rocketmq.base.producer;
|
||||||
|
|
||||||
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||||
|
import org.apache.rocketmq.client.producer.SendResult;
|
||||||
|
import org.apache.rocketmq.client.producer.SendStatus;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
||||||
|
public class SyncProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
|
||||||
|
DefaultMQProducer producer = new DefaultMQProducer("group1");
|
||||||
|
|
||||||
|
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
producer.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
Message message = new Message("base", "tag1", ("hello world" + i).getBytes());
|
||||||
|
SendResult result = producer.send(message);
|
||||||
|
SendStatus sendStatus = result.getSendStatus();
|
||||||
|
String msgId = result.getMsgId();
|
||||||
|
int queueId = result.getMessageQueue().getQueueId();
|
||||||
|
|
||||||
|
|
||||||
|
System.out.println("发送状态" + result + " " + sendStatus + " " + msgId + " " + queueId);
|
||||||
|
TimeUnit.SECONDS.sleep(1);
|
||||||
|
}
|
||||||
|
producer.shutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package com.zerroi.rocketmq.base.transaction;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||||
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TransactionConsumer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
|
||||||
|
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
|
||||||
|
defaultMQPushConsumer.subscribe("TransactionTopic", "*");
|
||||||
|
// 广播模式
|
||||||
|
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
|
||||||
|
messageExtList.forEach(messageExt -> log.info(new String(messageExt.getBody())));
|
||||||
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("Consumer Started.");
|
||||||
|
defaultMQPushConsumer.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
package com.zerroi.rocketmq.base.transaction;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.rocketmq.client.exception.MQClientException;
|
||||||
|
import org.apache.rocketmq.client.producer.LocalTransactionState;
|
||||||
|
import org.apache.rocketmq.client.producer.TransactionListener;
|
||||||
|
import org.apache.rocketmq.client.producer.TransactionMQProducer;
|
||||||
|
import org.apache.rocketmq.common.message.Message;
|
||||||
|
import org.apache.rocketmq.common.message.MessageExt;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class TransactionProducer {
|
||||||
|
public static void main(String[] args) throws MQClientException {
|
||||||
|
TransactionMQProducer transactionProducer = new TransactionMQProducer("TransactionProducer");
|
||||||
|
transactionProducer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||||
|
|
||||||
|
// 添加事务监听器
|
||||||
|
transactionProducer.setTransactionListener(new TransactionListener() {
|
||||||
|
/**
|
||||||
|
* 该方法执行本地事务
|
||||||
|
* @param msg Half(prepare) message
|
||||||
|
* @param arg Custom business parameter
|
||||||
|
* @return LocalTransactionState
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
|
||||||
|
if (StringUtils.equals("TagA", msg.getTags())) {
|
||||||
|
return LocalTransactionState.COMMIT_MESSAGE;
|
||||||
|
} else if (StringUtils.equals("TagB", msg.getTags())) {
|
||||||
|
return LocalTransactionState.ROLLBACK_MESSAGE;
|
||||||
|
} else {
|
||||||
|
return LocalTransactionState.UNKNOW;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 该方法进行事务状态回查
|
||||||
|
* @param msg Check message
|
||||||
|
* @return LocalTransactionState
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
|
||||||
|
log.info("messageTag:{}", msg.getTags());
|
||||||
|
return LocalTransactionState.COMMIT_MESSAGE;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
transactionProducer.start();
|
||||||
|
String[] tags = {"TagA", "TagB", "TagC"};
|
||||||
|
// 消费者最后消息两条消息
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
Message msg = new Message("TransactionTopic", tags[i], ("hello world" + i).getBytes());
|
||||||
|
transactionProducer.sendMessageInTransaction(msg, null);
|
||||||
|
}
|
||||||
|
transactionProducer.start();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<configuration debug="false">
|
||||||
|
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
|
||||||
|
|
||||||
|
<property name="APP_NAME" value="logtest"/>
|
||||||
|
<property name="LOG_HOME" value="${log.dir:-logs}/${APP_NAME}"/>
|
||||||
|
<property name="ENCODER_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{80} - %msg%n"/>
|
||||||
|
<contextName>${APP_NAME}</contextName>
|
||||||
|
|
||||||
|
<!-- Console Log -->
|
||||||
|
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||||
|
<Pattern>${ENCODER_PATTERN}</Pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<!-- File Log -->
|
||||||
|
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||||
|
<fileNamePattern>${LOG_HOME}/output.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||||
|
<maxHistory>7</maxHistory>
|
||||||
|
</rollingPolicy>
|
||||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||||
|
<pattern>${ENCODER_PATTERN}</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<!-- Error Log -->
|
||||||
|
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||||
|
<fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||||
|
<maxHistory>7</maxHistory>
|
||||||
|
</rollingPolicy>
|
||||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||||
|
<pattern>${ENCODER_PATTERN}</pattern>
|
||||||
|
</encoder>
|
||||||
|
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||||
|
<level>WARN</level>
|
||||||
|
</filter>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<!-- Sync Log -->
|
||||||
|
<appender name="SYNC_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||||
|
<fileNamePattern>${LOG_HOME}/sync.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||||
|
<maxHistory>7</maxHistory>
|
||||||
|
</rollingPolicy>
|
||||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||||
|
<pattern>${ENCODER_PATTERN}</pattern>
|
||||||
|
</encoder>
|
||||||
|
</appender>
|
||||||
|
|
||||||
|
<logger name="log.sync" level="DEBUG" additivity="true">
|
||||||
|
<appender-ref ref="SYNC_FILE"/>
|
||||||
|
</logger>
|
||||||
|
|
||||||
|
<root level="INFO">
|
||||||
|
<appender-ref ref="STDOUT"/>
|
||||||
|
<appender-ref ref="FILE"/>
|
||||||
|
<appender-ref ref="ERROR_FILE"/>
|
||||||
|
</root>
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue