diff --git a/pom.xml b/pom.xml index 26b4b88..de15186 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,16 @@ lombok 1.18.32 + + ch.qos.logback + logback-core + 1.5.6 + + + ch.qos.logback + logback-classic + 1.5.6 + \ No newline at end of file diff --git a/src/main/java/com/zerroi/rocketmq/base/consumer/OrderProducer.java b/src/main/java/com/zerroi/rocketmq/base/consumer/OrderProducer.java deleted file mode 100644 index 4bc154f..0000000 --- a/src/main/java/com/zerroi/rocketmq/base/consumer/OrderProducer.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.zerroi.rocketmq.base.consumer; - -import com.zerroi.rocketmq.base.order.OrderStep; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.MessageQueueSelector; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageQueue; - -import java.util.List; - -public class OrderProducer { - public static void main(String[] args) throws MQClientException { - DefaultMQProducer producer = new DefaultMQProducer("group1"); - - producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); - producer.start(); - - List list = OrderStep.buildOrders(); - list.forEach(orderStep -> { - String body = orderStep + ""; - Message msg = new Message("OrderTopic", "OrderTag", "i" + orderStep.getOrderId(), body.getBytes()); - try { - producer.send(msg, new MessageQueueSelector() { - @Override - public MessageQueue select(List mqs, Message msg, Object arg) { - return null; - } - }, orderStep.getOrderId()); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } -} diff --git a/src/main/java/com/zerroi/rocketmq/base/order/OrderConsumer.java b/src/main/java/com/zerroi/rocketmq/base/order/OrderConsumer.java new file mode 100644 index 0000000..b0bf7a4 --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/order/OrderConsumer.java @@ -0,0 +1,30 @@ +package com.zerroi.rocketmq.base.order; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +@Slf4j +public class OrderConsumer { + public static void main(String[] args) throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); + consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + // 订阅主题与TAG + consumer.subscribe("OrderTopic", "OrderTag"); + + // 注册消息监听器 + consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { + for (MessageExt msg : msgs) { + String message = new String(msg.getBody()); + log.info("Receive message: {}", message); + } + return ConsumeOrderlyStatus.SUCCESS; + }); + + consumer.start(); + log.info("OrderConsumer Started"); + } +} diff --git a/src/main/java/com/zerroi/rocketmq/base/order/OrderProducer.java b/src/main/java/com/zerroi/rocketmq/base/order/OrderProducer.java new file mode 100644 index 0000000..0f4fe1a --- /dev/null +++ b/src/main/java/com/zerroi/rocketmq/base/order/OrderProducer.java @@ -0,0 +1,48 @@ +package com.zerroi.rocketmq.base.order; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + +@Slf4j +public class OrderProducer { + public static void main(String[] args) throws MQClientException { + DefaultMQProducer producer = new DefaultMQProducer("group1"); + + producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876"); + producer.start(); + List list = OrderStep.buildOrders(); + log.info("需要发送的消息集合大小:{}", list.size()); + for (int i = 0; i < list.size(); i++) { + String body = list.get(i) + ""; + Message msg = new Message("OrderTopic", "OrderTag", "i" + i, body.getBytes()); + try { + SendResult sendResult = producer.send(msg, new MessageQueueSelector() { + /** + * 选择消息队列,根据业务标识参数向指定的消息队列发送消息 + * @param mqs 消息队列集合 + * @param msg 消息对象 + * @param arg 业务标识参数 + * @return 选择的消息队列 + */ + @Override + public MessageQueue select(List mqs, Message msg, Object arg) { + long orderId = (long) arg; + long mqIndex = orderId % mqs.size(); + return mqs.get((int) mqIndex); + } + }, list.get(i).getOrderId()); + System.out.println(sendResult); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + producer.shutdown(); + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..ddb3f7d --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,62 @@ + + + + + + + + ${APP_NAME} + + + + + ${ENCODER_PATTERN} + + + + + + + ${LOG_HOME}/output.%d{yyyy-MM-dd}.log + 7 + + + ${ENCODER_PATTERN} + + + + + + + ${LOG_HOME}/error.%d{yyyy-MM-dd}.log + 7 + + + ${ENCODER_PATTERN} + + + WARN + + + + + + + ${LOG_HOME}/sync.%d{yyyy-MM-dd}.log + 7 + + + ${ENCODER_PATTERN} + + + + + + + + + + + + +