transaction

This commit is contained in:
liwei 2024-07-13 15:06:10 +08:00
parent b0f2b28281
commit 38a400b7c9
3 changed files with 84 additions and 1 deletions

View File

@ -8,7 +8,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public class MessageConsumer { public class MessageConsumer {
public static void main(String[] args) throws MQClientException { public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("TransactionProducer");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("base", "tag1"); defaultMQPushConsumer.subscribe("base", "tag1");

View File

@ -0,0 +1,26 @@
package com.zerroi.rocketmq.base.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@Slf4j
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1");
defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
defaultMQPushConsumer.subscribe("TransactionTopic", "*");
// 广播模式
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> {
messageExtList.forEach(messageExt -> log.info(new String(messageExt.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
log.info("Consumer Started.");
defaultMQPushConsumer.start();
}
}

View File

@ -0,0 +1,57 @@
package com.zerroi.rocketmq.base.transaction;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer transactionProducer = new TransactionMQProducer("TransactionProducer");
transactionProducer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
// 添加事务监听器
transactionProducer.setTransactionListener(new TransactionListener() {
/**
* 该方法执行本地事务
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return LocalTransactionState
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if (StringUtils.equals("TagA", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 该方法进行事务状态回查
* @param msg Check message
* @return LocalTransactionState
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("messageTag:{}", msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionProducer.start();
String[] tags = {"TagA", "TagB", "TagC"};
// 消费者最后消息两条消息
for (int i = 0; i < 3; i++) {
Message msg = new Message("TransactionTopic", tags[i], ("hello world" + i).getBytes());
transactionProducer.sendMessageInTransaction(msg, null);
}
transactionProducer.start();
}
}