From 8a53930160b7ec1fa5ff80b9802ace7230e28b7e Mon Sep 17 00:00:00 2001 From: dute7liang <383200134@qq.com> Date: Fri, 29 Dec 2023 02:04:32 +0800 Subject: [PATCH] init --- .../src/main/resources/application.yml | 5 +++ .../java/com/ruoyi/cai/mq/AmqpConsumer.java | 44 +++++++++++++++++++ .../java/com/ruoyi/cai/mq/AmqpProducer.java | 26 +++++++++++ .../ruoyi/cai/mq/CheckTimeOutMqConfig.java | 35 +++++++++++++++ .../java/com/ruoyi/cai/mq/RabbitMqConfig.java | 26 +++++++++++ ruoyi-common/pom.xml | 5 +++ 6 files changed, 141 insertions(+) create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 4a402006..cfd50d3e 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -250,3 +250,8 @@ management: show-details: ALWAYS logfile: external-file: ./logs/sys-console.log +spring: + rabbitmq: + addresses: 127.0.0.1 #ip地址 + username: admin # 账号 + password: admin # 密码 diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java new file mode 100644 index 00000000..b6601946 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java @@ -0,0 +1,44 @@ +package com.ruoyi.cai.mq; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class AmqpConsumer { + + public final static String CALCULATE_SALES_QUEUE = "calculateSalesQueue"; + public final static String CALCULATE_SALES_EXCHANGE = "calculateSalesExchange"; + public final static String CALCULATE_SALES_KEY = "calculateSalesKey"; + + + /** + * queuesToDeclare:支持多个队列,将队列绑定到默认交换机上,routeKey为队列名称。 + * @param msg 接收到的消息 + */ + /**@RabbitListener(queuesToDeclare = @Queue(value = "testQueue"),containerFactory = "customContainerFactory") + public void listener(String msg) { + System.out.println(msg); + }*/ + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(value = CALCULATE_SALES_QUEUE, durable = "false", autoDelete = "false"), + exchange = @Exchange(value = CALCULATE_SALES_EXCHANGE), + key = CALCULATE_SALES_KEY) + ,containerFactory = "customContainerFactory") + public void calculateSalesQueue(String message) { + log.info("calculateSalesQueue: " + message); + } + + @RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME + ,containerFactory = "customContainerFactory") + public void checkTimeOutMq(String message) { + log.info("checkTimeOutMq: " + message); + } + + +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java new file mode 100644 index 00000000..2cffe8ab --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java @@ -0,0 +1,26 @@ +package com.ruoyi.cai.mq; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class AmqpProducer { + @Autowired + private RabbitTemplate rabbitTemplate; + + public void sendCalculateSales(String message,Integer timeout){ + rabbitTemplate.convertAndSend(AmqpConsumer.CALCULATE_SALES_EXCHANGE, AmqpConsumer.CALCULATE_SALES_KEY, message); + } + + + public void sendCheckTimeOut(String message,Integer timeout){ + rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME, + CheckTimeOutMqConfig.ROUTING_KEY, + message, + messagePostProcessor -> { + messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 + return messagePostProcessor; + }); + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java new file mode 100644 index 00000000..1aee875f --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java @@ -0,0 +1,35 @@ +package com.ruoyi.cai.mq; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class CheckTimeOutMqConfig { + + public static final String EXCHANGE_NAME = "checkRoomTimeExchange"; + public static final String QUEUE_NAME = "checkRoomTimeQueue"; + public static final String ROUTING_KEY = "checkRoomTimeRouting"; + + @Bean + public CustomExchange delayedExchange() { + return new CustomExchange(EXCHANGE_NAME, + "x-delayed-message", // 消息类型 + true, // 是否持久化 + false); // 是否自动删除 + } + + @Bean + public Queue delayedQueue() { + return QueueBuilder.durable(QUEUE_NAME) + .withArgument("x-delayed-type", "direct") + .build(); + } + + @Bean + public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { + return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs(); + } + +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java new file mode 100644 index 00000000..b07192ae --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java @@ -0,0 +1,26 @@ +package com.ruoyi.cai.mq; + +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitMqConfig { + + //并发数量 + public static final int DEFAULT_CONCURRENT = Runtime.getRuntime().availableProcessors(); + + @Bean("customContainerFactory") + public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConcurrentConsumers(DEFAULT_CONCURRENT*2); + factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT*2); + configurer.configure(factory, connectionFactory); + return factory; + } + + +} diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml index 198a59e1..253cc5cc 100644 --- a/ruoyi-common/pom.xml +++ b/ruoyi-common/pom.xml @@ -166,6 +166,11 @@ 27.0.1-jre + + org.springframework.boot + spring-boot-starter-amqp + +