This commit is contained in:
dute7liang
2023-12-29 02:04:32 +08:00
parent 934a613f11
commit 8a53930160
6 changed files with 141 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
package com.ruoyi.cai.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class AmqpConsumer {
public final static String CALCULATE_SALES_QUEUE = "calculateSalesQueue";
public final static String CALCULATE_SALES_EXCHANGE = "calculateSalesExchange";
public final static String CALCULATE_SALES_KEY = "calculateSalesKey";
/**
* queuesToDeclare支持多个队列将队列绑定到默认交换机上routeKey为队列名称。
* @param msg 接收到的消息
*/
/**@RabbitListener(queuesToDeclare = @Queue(value = "testQueue"),containerFactory = "customContainerFactory")
public void listener(String msg) {
System.out.println(msg);
}*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = CALCULATE_SALES_QUEUE, durable = "false", autoDelete = "false"),
exchange = @Exchange(value = CALCULATE_SALES_EXCHANGE),
key = CALCULATE_SALES_KEY)
,containerFactory = "customContainerFactory")
public void calculateSalesQueue(String message) {
log.info("calculateSalesQueue: " + message);
}
@RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME
,containerFactory = "customContainerFactory")
public void checkTimeOutMq(String message) {
log.info("checkTimeOutMq: " + message);
}
}

View File

@@ -0,0 +1,26 @@
package com.ruoyi.cai.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class AmqpProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCalculateSales(String message,Integer timeout){
rabbitTemplate.convertAndSend(AmqpConsumer.CALCULATE_SALES_EXCHANGE, AmqpConsumer.CALCULATE_SALES_KEY, message);
}
public void sendCheckTimeOut(String message,Integer timeout){
rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME,
CheckTimeOutMqConfig.ROUTING_KEY,
message,
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒
return messagePostProcessor;
});
}
}

View File

@@ -0,0 +1,35 @@
package com.ruoyi.cai.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CheckTimeOutMqConfig {
public static final String EXCHANGE_NAME = "checkRoomTimeExchange";
public static final String QUEUE_NAME = "checkRoomTimeQueue";
public static final String ROUTING_KEY = "checkRoomTimeRouting";
@Bean
public CustomExchange delayedExchange() {
return new CustomExchange(EXCHANGE_NAME,
"x-delayed-message", // 消息类型
true, // 是否持久化
false); // 是否自动删除
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.withArgument("x-delayed-type", "direct")
.build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
}
}

View File

@@ -0,0 +1,26 @@
package com.ruoyi.cai.mq;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
//并发数量
public static final int DEFAULT_CONCURRENT = Runtime.getRuntime().availableProcessors();
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(DEFAULT_CONCURRENT*2);
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT*2);
configurer.configure(factory, connectionFactory);
return factory;
}
}