From 38a400b7c9098f952d0082e2ee7eecbe4cd4fe3c Mon Sep 17 00:00:00 2001 From: liwei Date: Sat, 13 Jul 2024 15:06:10 +0800 Subject: [PATCH] transaction --- .../base/consumer/MessageConsumer.java | 2 +- .../base/transaction/TransactionConsumer.java | 26 +++++++++ .../base/transaction/TransactionProducer.java | 57 +++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/zerroi/rocketmq/base/transaction/TransactionConsumer.java create mode 100644 src/main/java/com/zerroi/rocketmq/base/transaction/TransactionProducer.java diff --git a/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java b/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java index 703a64c..a16596b 100644 --- a/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java +++ b/src/main/java/com/zerroi/rocketmq/base/consumer/MessageConsumer.java @@ -8,7 +8,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; public class MessageConsumer { public static void main(String[] args) throws MQClientException { - DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("TransactionProducer"); defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); defaultMQPushConsumer.subscribe("base", "tag1"); diff --git a/src/main/java/com/zerroi/rocketmq/base/transaction/TransactionConsumer.java b/src/main/java/com/zerroi/rocketmq/base/transaction/TransactionConsumer.java new file mode 100644 index 0000000..b272c3c --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/transaction/TransactionConsumer.java @@ -0,0 +1,26 @@ +package com.zerroi.rocketmq.base.transaction; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; + +@Slf4j +public class TransactionConsumer { + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group1"); + defaultMQPushConsumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + + defaultMQPushConsumer.subscribe("TransactionTopic", "*"); + // 广播模式 + defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (messageExtList, context) -> { + messageExtList.forEach(messageExt -> log.info(new String(messageExt.getBody()))); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + + log.info("Consumer Started."); + defaultMQPushConsumer.start(); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/transaction/TransactionProducer.java b/src/main/java/com/zerroi/rocketmq/base/transaction/TransactionProducer.java new file mode 100644 index 0000000..ba2e2cc --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/transaction/TransactionProducer.java @@ -0,0 +1,57 @@ +package com.zerroi.rocketmq.base.transaction; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; + +@Slf4j +public class TransactionProducer { + public static void main(String[] args) throws MQClientException { + TransactionMQProducer transactionProducer = new TransactionMQProducer("TransactionProducer"); + transactionProducer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + + // 添加事务监听器 + transactionProducer.setTransactionListener(new TransactionListener() { + /** + * 该方法执行本地事务 + * @param msg Half(prepare) message + * @param arg Custom business parameter + * @return LocalTransactionState + */ + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + if (StringUtils.equals("TagA", msg.getTags())) { + return LocalTransactionState.COMMIT_MESSAGE; + } else if (StringUtils.equals("TagB", msg.getTags())) { + return LocalTransactionState.ROLLBACK_MESSAGE; + } else { + return LocalTransactionState.UNKNOW; + } + } + + /** + * 该方法进行事务状态回查 + * @param msg Check message + * @return LocalTransactionState + */ + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + log.info("messageTag:{}", msg.getTags()); + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + transactionProducer.start(); + String[] tags = {"TagA", "TagB", "TagC"}; + // 消费者最后消息两条消息 + for (int i = 0; i < 3; i++) { + Message msg = new Message("TransactionTopic", tags[i], ("hello world" + i).getBytes()); + transactionProducer.sendMessageInTransaction(msg, null); + } + transactionProducer.start(); + } +}