This commit is contained in:
77
2024-03-24 19:07:45 +08:00
parent d0b1f961e8
commit e6611ea05e
45 changed files with 935 additions and 104 deletions

View File

@@ -0,0 +1,21 @@
package com.ruoyi.xq.mq;
import com.alibaba.fastjson.JSON;
import com.ruoyi.xq.mq.consumer.CommonConsumer;
import com.ruoyi.xq.mq.handle.dto.CommonDTO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class AmqpProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public <T extends CommonDTO> void sendCommonMq(T dto){
rabbitTemplate.convertAndSend(CommonConsumer.COMMON_EXCHANGE,
CommonConsumer.COMMON_KEY, JSON.toJSONString(dto));
}
}

View File

@@ -0,0 +1,8 @@
package com.ruoyi.xq.mq;
public enum CommonConsumerEnum {
/**
* 分销
*/
CALCULATE_SALES
}

View File

@@ -0,0 +1,31 @@
package com.ruoyi.xq.mq.config;
import com.ruoyi.xq.mq.handle.IHandle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class HandleConfig {
public static Map<String, IHandle> MAP = new HashMap<>();
@Autowired
private List<IHandle> handles;
@PostConstruct
public void init(){
for (IHandle handle : handles) {
MAP.put(handle.getType().name(),handle);
}
}
public IHandle getHandle(String type){
return MAP.get(type);
}
}

View File

@@ -0,0 +1,26 @@
package com.ruoyi.xq.mq.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
//并发数量
public static final int DEFAULT_CONCURRENT = Runtime.getRuntime().availableProcessors();
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(DEFAULT_CONCURRENT*2);
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT*2);
configurer.configure(factory, connectionFactory);
return factory;
}
}

View File

@@ -0,0 +1,43 @@
package com.ruoyi.xq.mq.consumer;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.xq.mq.config.HandleConfig;
import com.ruoyi.xq.mq.handle.IHandle;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class CommonConsumer {
public final static String COMMON_QUEUE = "xqCommonQueue";
public final static String COMMON_EXCHANGE = "xqCommonExchange";
public final static String COMMON_KEY = "xqCommonKey";
@Autowired
private HandleConfig handleConfig;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = COMMON_QUEUE, durable = "false", autoDelete = "false"),
exchange = @Exchange(value = COMMON_EXCHANGE),
key = COMMON_KEY)
,containerFactory = "customContainerFactory")
public void calculateSalesQueue(String message) {
log.info("队列消息处理-开始: message=" + message);
try {
JSONObject object = JSON.parseObject(message);
String type = object.getString("type");
IHandle handle = handleConfig.getHandle(type);
handle.run(message);
}catch (Exception e){
log.error("队列消息处理-失败: message=" + message,e);
}
log.info("队列消息处理-结束: message=" + message);
}
}

View File

@@ -0,0 +1,11 @@
package com.ruoyi.xq.mq.handle;
import com.ruoyi.xq.mq.CommonConsumerEnum;
public interface IHandle {
void run(String message);
CommonConsumerEnum getType();
}

View File

@@ -0,0 +1,16 @@
package com.ruoyi.xq.mq.handle.dto;
import com.ruoyi.xq.mq.CommonConsumerEnum;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class CalculateSalesHandleDTO extends CommonDTO {
private Long consumerLogId;
private LocalDateTime time;
public CalculateSalesHandleDTO() {
this.setType(CommonConsumerEnum.CALCULATE_SALES);
this.time = LocalDateTime.now();
}
}

View File

@@ -0,0 +1,9 @@
package com.ruoyi.xq.mq.handle.dto;
import com.ruoyi.xq.mq.CommonConsumerEnum;
import lombok.Data;
@Data
public class CommonDTO {
private CommonConsumerEnum type;
}