logback配置,订单消息顺序消费
This commit is contained in:
parent
8b8237d9ab
commit
61c0539c4c
10
pom.xml
10
pom.xml
|
@ -24,6 +24,16 @@
|
|||
<artifactId>lombok</artifactId>
|
||||
<version>1.18.32</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-core</artifactId>
|
||||
<version>1.5.6</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.5.6</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -1,35 +0,0 @@
|
|||
package com.zerroi.rocketmq.base.consumer;
|
||||
|
||||
import com.zerroi.rocketmq.base.order.OrderStep;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class OrderProducer {
|
||||
public static void main(String[] args) throws MQClientException {
|
||||
DefaultMQProducer producer = new DefaultMQProducer("group1");
|
||||
|
||||
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||
producer.start();
|
||||
|
||||
List<OrderStep> list = OrderStep.buildOrders();
|
||||
list.forEach(orderStep -> {
|
||||
String body = orderStep + "";
|
||||
Message msg = new Message("OrderTopic", "OrderTag", "i" + orderStep.getOrderId(), body.getBytes());
|
||||
try {
|
||||
producer.send(msg, new MessageQueueSelector() {
|
||||
@Override
|
||||
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
||||
return null;
|
||||
}
|
||||
}, orderStep.getOrderId());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package com.zerroi.rocketmq.base.order;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
|
||||
@Slf4j
|
||||
public class OrderConsumer {
|
||||
public static void main(String[] args) throws MQClientException {
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
|
||||
consumer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||
// 订阅主题与TAG
|
||||
consumer.subscribe("OrderTopic", "OrderTag");
|
||||
|
||||
// 注册消息监听器
|
||||
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
|
||||
for (MessageExt msg : msgs) {
|
||||
String message = new String(msg.getBody());
|
||||
log.info("Receive message: {}", message);
|
||||
}
|
||||
return ConsumeOrderlyStatus.SUCCESS;
|
||||
});
|
||||
|
||||
consumer.start();
|
||||
log.info("OrderConsumer Started");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package com.zerroi.rocketmq.base.order;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.client.producer.MessageQueueSelector;
|
||||
import org.apache.rocketmq.client.producer.SendResult;
|
||||
import org.apache.rocketmq.common.message.Message;
|
||||
import org.apache.rocketmq.common.message.MessageQueue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
public class OrderProducer {
|
||||
public static void main(String[] args) throws MQClientException {
|
||||
DefaultMQProducer producer = new DefaultMQProducer("group1");
|
||||
|
||||
producer.setNamesrvAddr("192.168.150.110:9876;192.168.150.111:9876");
|
||||
producer.start();
|
||||
List<OrderStep> list = OrderStep.buildOrders();
|
||||
log.info("需要发送的消息集合大小:{}", list.size());
|
||||
for (int i = 0; i < list.size(); i++) {
|
||||
String body = list.get(i) + "";
|
||||
Message msg = new Message("OrderTopic", "OrderTag", "i" + i, body.getBytes());
|
||||
try {
|
||||
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
|
||||
/**
|
||||
* 选择消息队列,根据业务标识参数向指定的消息队列发送消息
|
||||
* @param mqs 消息队列集合
|
||||
* @param msg 消息对象
|
||||
* @param arg 业务标识参数
|
||||
* @return 选择的消息队列
|
||||
*/
|
||||
@Override
|
||||
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
|
||||
long orderId = (long) arg;
|
||||
long mqIndex = orderId % mqs.size();
|
||||
return mqs.get((int) mqIndex);
|
||||
}
|
||||
}, list.get(i).getOrderId());
|
||||
System.out.println(sendResult);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
producer.shutdown();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration debug="false">
|
||||
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
|
||||
|
||||
<property name="APP_NAME" value="logtest"/>
|
||||
<property name="LOG_HOME" value="${log.dir:-logs}/${APP_NAME}"/>
|
||||
<property name="ENCODER_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{80} - %msg%n"/>
|
||||
<contextName>${APP_NAME}</contextName>
|
||||
|
||||
<!-- Console Log -->
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||
<Pattern>${ENCODER_PATTERN}</Pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- File Log -->
|
||||
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/output.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<maxHistory>7</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||
<pattern>${ENCODER_PATTERN}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- Error Log -->
|
||||
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<maxHistory>7</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||
<pattern>${ENCODER_PATTERN}</pattern>
|
||||
</encoder>
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<level>WARN</level>
|
||||
</filter>
|
||||
</appender>
|
||||
|
||||
<!-- Sync Log -->
|
||||
<appender name="SYNC_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${LOG_HOME}/sync.%d{yyyy-MM-dd}.log</fileNamePattern>
|
||||
<maxHistory>7</maxHistory>
|
||||
</rollingPolicy>
|
||||
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
|
||||
<pattern>${ENCODER_PATTERN}</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<logger name="log.sync" level="DEBUG" additivity="true">
|
||||
<appender-ref ref="SYNC_FILE"/>
|
||||
</logger>
|
||||
|
||||
<root level="INFO">
|
||||
<appender-ref ref="STDOUT"/>
|
||||
<appender-ref ref="FILE"/>
|
||||
<appender-ref ref="ERROR_FILE"/>
|
||||
</root>
|
||||
</configuration>
|
Loading…
Reference in New Issue