This commit is contained in:
dute7liang
2024-01-19 23:08:25 +08:00
parent 95f03f9a35
commit 168ce04086
17 changed files with 522 additions and 1 deletions

View File

@@ -2,6 +2,8 @@ package com.ruoyi.cai.mq;
import com.alibaba.fastjson.JSON;
import com.ruoyi.cai.mq.consumer.CalculateSalesQueueConsumer;
import com.ruoyi.cai.mq.consumer.CommonConsumer;
import com.ruoyi.cai.mq.dto.CommonDTO;
import com.ruoyi.cai.mq.dto.CommonDelayDto;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@@ -16,6 +18,12 @@ public class AmqpProducer {
rabbitTemplate.convertAndSend(CalculateSalesQueueConsumer.CALCULATE_SALES_EXCHANGE, CalculateSalesQueueConsumer.CALCULATE_SALES_KEY, message);
}
public <T extends CommonDTO> void sendCommonMq(T dto){
rabbitTemplate.convertAndSend(CommonConsumer.COMMON_EXCHANGE,
CommonConsumer.COMMON_KEY, JSON.toJSONString(dto));
}
public void sendCheckTimeOut(String message,Integer timeout){
rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME,
CheckTimeOutMqConfig.ROUTING_KEY,

View File

@@ -0,0 +1,44 @@
package com.ruoyi.cai.mq.consumer;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.mq.handle.HandleConfig;
import com.ruoyi.cai.mq.handle.IHandle;
import com.ruoyi.cai.service.ConsumeLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class CommonConsumer {
public final static String COMMON_QUEUE = "caiCommonQueue";
public final static String COMMON_EXCHANGE = "caiCommonExchange";
public final static String COMMON_KEY = "caiCommonKey";
@Autowired
private HandleConfig handleConfig;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = COMMON_QUEUE, durable = "false", autoDelete = "false"),
exchange = @Exchange(value = COMMON_EXCHANGE),
key = COMMON_KEY)
,containerFactory = "customContainerFactory")
public void calculateSalesQueue(String message) {
log.info("队列消息处理-开始: message=" + message);
try {
JSONObject object = JSON.parseObject(message);
String type = object.getString("type");
IHandle handle = handleConfig.getHandle(type);
handle.run(message);
}catch (Exception e){
log.error("队列消息处理-失败: message=" + message,e);
}
log.info("队列消息处理-结束: message=" + message);
}
}

View File

@@ -0,0 +1,5 @@
package com.ruoyi.cai.mq.dto;
public enum CommonConsumerEnum {
RANK
}

View File

@@ -0,0 +1,8 @@
package com.ruoyi.cai.mq.dto;
import lombok.Data;
@Data
public class CommonDTO {
private CommonConsumerEnum type;
}

View File

@@ -0,0 +1,31 @@
package com.ruoyi.cai.mq.handle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class HandleConfig {
public static Map<String,IHandle> MAP = new HashMap<>();
@Autowired
private List<IHandle> handles;
@PostConstruct
public void init(){
for (IHandle handle : handles) {
MAP.put(handle.getType().name(),handle);
}
}
public IHandle getHandle(String type){
return MAP.get(type);
}
}

View File

@@ -0,0 +1,10 @@
package com.ruoyi.cai.mq.handle;
import com.ruoyi.cai.mq.dto.CommonConsumerEnum;
public interface IHandle {
void run(String message);
CommonConsumerEnum getType();
}

View File

@@ -0,0 +1,25 @@
package com.ruoyi.cai.mq.handle;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.mq.dto.CommonConsumerEnum;
import com.ruoyi.cai.mq.handle.dto.RankDTO;
import com.ruoyi.cai.rank.RankManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RankIHandle implements IHandle {
@Autowired
private RankManager rankManager;
@Override
public void run(String message) {
RankDTO rank = JSON.parseObject(message, RankDTO.class);
rankManager.addRank(rank);
}
@Override
public CommonConsumerEnum getType() {
return CommonConsumerEnum.RANK;
}
}

View File

@@ -0,0 +1,27 @@
package com.ruoyi.cai.mq.handle.dto;
import com.ruoyi.cai.mq.dto.CommonConsumerEnum;
import com.ruoyi.cai.mq.dto.CommonDTO;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class RankDTO extends CommonDTO {
/**
* 1-魅力榜
* 2-邀请榜
*/
private Integer rankType;
private Long userId;
private Long price;
private LocalDateTime happenTime;
private Long traceId;
public RankDTO() {
this.setType(CommonConsumerEnum.RANK);
this.happenTime = LocalDateTime.now();
}
}