diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/init/MqControllerTest.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/init/MqControllerTest.java index d93368d6..28fa06ac 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/init/MqControllerTest.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/init/MqControllerTest.java @@ -19,7 +19,7 @@ public class MqControllerTest { @GetMapping("/send") public void send(String message,Integer time){ - amqpProducer.sendCheckTimeOut(message,time); + amqpProducer.sendRoomSettleDelay(message,time); } @GetMapping("/send2") diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java index 807b2a98..2c93942d 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java @@ -1,6 +1,9 @@ package com.ruoyi.cai.mq; import com.alibaba.fastjson.JSON; +import com.ruoyi.cai.mq.config.RoomCheckDelayMqConfig; +import com.ruoyi.cai.mq.config.RoomSettleDelayMqConfig; +import com.ruoyi.cai.mq.config.CommonDelayMqConfig; import com.ruoyi.cai.mq.consumer.CalculateSalesQueueConsumer; import com.ruoyi.cai.mq.consumer.CommonConsumer; import com.ruoyi.cai.mq.dto.CommonDTO; @@ -24,9 +27,19 @@ public class AmqpProducer { CommonConsumer.COMMON_KEY, JSON.toJSONString(dto)); } - public void sendCheckTimeOut(String message,Integer timeout){ - rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME, - CheckTimeOutMqConfig.ROUTING_KEY, + public void sendRoomCheckDelay(String message, Integer timeout){ + rabbitTemplate.convertAndSend(RoomCheckDelayMqConfig.EXCHANGE_NAME, + RoomCheckDelayMqConfig.ROUTING_KEY, + message, + messagePostProcessor -> { + messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 + return messagePostProcessor; + }); + } + + public void sendRoomSettleDelay(String message, Integer timeout){ + rabbitTemplate.convertAndSend(RoomSettleDelayMqConfig.EXCHANGE_NAME, + RoomSettleDelayMqConfig.ROUTING_KEY, message, messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 @@ -43,17 +56,4 @@ public class AmqpProducer { return messagePostProcessor; }); } - - public void sendCommonDelayMq(Integer type,Long roomId,Integer timeout){ - CommonDelayDto dto = new CommonDelayDto(); - dto.setType(type); - dto.setRoomId(roomId); - rabbitTemplate.convertAndSend(CommonDelayMqConfig.EXCHANGE_NAME, - CommonDelayMqConfig.ROUTING_KEY, - JSON.toJSONString(dto), - messagePostProcessor -> { - messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 - return messagePostProcessor; - }); - } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java new file mode 100644 index 00000000..83d6a1df --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java @@ -0,0 +1,14 @@ +package com.ruoyi.cai.mq; + +import lombok.Getter; + +@Getter +public enum CommonDelayTypeEnum { + USER_FORBID(1), + ; + private final Integer code; + + CommonDelayTypeEnum(Integer code) { + this.code = code; + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/CommonDelayMqConfig.java similarity index 97% rename from ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java rename to ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/CommonDelayMqConfig.java index ee9bf4b0..812b5526 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/CommonDelayMqConfig.java @@ -1,4 +1,4 @@ -package com.ruoyi.cai.mq; +package com.ruoyi.cai.mq.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RabbitMqConfig.java similarity index 97% rename from ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java rename to ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RabbitMqConfig.java index b07192ae..b5140049 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/RabbitMqConfig.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RabbitMqConfig.java @@ -1,4 +1,4 @@ -package com.ruoyi.cai.mq; +package com.ruoyi.cai.mq.config; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RoomCheckDelayMqConfig.java similarity index 77% rename from ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java rename to ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RoomCheckDelayMqConfig.java index 743154f2..fd5ddeaf 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RoomCheckDelayMqConfig.java @@ -1,4 +1,4 @@ -package com.ruoyi.cai.mq; +package com.ruoyi.cai.mq.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; @@ -8,11 +8,11 @@ import java.util.HashMap; @Configuration -public class CheckTimeOutMqConfig { +public class RoomCheckDelayMqConfig { - public static final String EXCHANGE_NAME = "checkRoomTimeExchange"; - public static final String QUEUE_NAME = "checkRoomTimeQueue"; - public static final String ROUTING_KEY = "checkRoomTimeRouting"; + public static final String EXCHANGE_NAME = "roomCheckDelayExchange"; + public static final String QUEUE_NAME = "roomCheckDelayQueue"; + public static final String ROUTING_KEY = "roomCheckDelayRouting"; @Bean public CustomExchange delayedExchange() { diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RoomSettleDelayMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RoomSettleDelayMqConfig.java new file mode 100644 index 00000000..81e10fd0 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/config/RoomSettleDelayMqConfig.java @@ -0,0 +1,40 @@ +package com.ruoyi.cai.mq.config; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; + + +@Configuration +public class RoomSettleDelayMqConfig { + + public static final String EXCHANGE_NAME = "roomSettleDelayExchange"; + public static final String QUEUE_NAME = "roomSettleDelayQueue"; + public static final String ROUTING_KEY = "roomSettleDelayRouting"; + + @Bean + public CustomExchange delayedExchange() { + HashMap args = new HashMap<>(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(EXCHANGE_NAME, + "x-delayed-message", // 消息类型 + true, // 是否持久化 + false,// 是否自动删除 + args); + } + + @Bean + public Queue delayedQueue() { + return QueueBuilder.durable(QUEUE_NAME) + .withArgument("x-delayed-type", "direct") + .build(); + } + + @Bean + public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { + return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs(); + } + +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java index 292db811..aa0bf6c6 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java @@ -2,9 +2,9 @@ package com.ruoyi.cai.mq.consumer; import com.alibaba.fastjson2.JSON; import com.ruoyi.cai.manager.UserForbidManager; +import com.ruoyi.cai.mq.CommonDelayTypeEnum; +import com.ruoyi.cai.mq.config.CommonDelayMqConfig; import com.ruoyi.cai.mq.dto.CommonDelayDto; -import com.ruoyi.cai.mq.CommonDelayMqConfig; -import com.ruoyi.cai.ws.job.CheckTimeOutJob; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; @@ -13,8 +13,6 @@ import org.springframework.stereotype.Component; @Slf4j @Component public class CommonDelayMqConsumer { - @Autowired - private CheckTimeOutJob checkTimeOutJob; @Autowired private UserForbidManager userForbidManager; @@ -23,12 +21,18 @@ public class CommonDelayMqConsumer { public void checkTimeOutMq(String message) { log.info("CommonDelayMqConsumer: " + message); CommonDelayDto dto = JSON.parseObject(message, CommonDelayDto.class); - switch (dto.getType()){ - case 1: - checkTimeOutJob.deal(dto.getRoomId()); - break; - case 2: - userForbidManager.checkAll(); + CommonDelayTypeEnum typeEnum = dto.getTypeEnum(); + if(typeEnum == null){ + log.warn("延时任务执行失败,未检测到正确的类型 dto={}",JSON.toJSONString(dto)); + return; + } + switch (typeEnum){ + case USER_FORBID: + try { + userForbidManager.checkExpire(dto.getForbidId()); + }catch (Exception e){ + log.error("检查用户封禁状态失败!需要开发确认!",e); + } break; default: break; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/RoomCheckDelayMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/RoomCheckDelayMqConsumer.java new file mode 100644 index 00000000..afeea865 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/RoomCheckDelayMqConsumer.java @@ -0,0 +1,36 @@ +package com.ruoyi.cai.mq.consumer; + +import com.ruoyi.cai.mq.AmqpProducer; +import com.ruoyi.cai.mq.config.RoomCheckDelayMqConfig; +import com.ruoyi.cai.ws.job.JobResp; +import com.ruoyi.cai.ws.job.RoomCheckJobService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class RoomCheckDelayMqConsumer { + @Autowired + private AmqpProducer amqpProducer; + @Autowired + private RoomCheckJobService roomCheckJobService; + + @RabbitListener(queues = RoomCheckDelayMqConfig.QUEUE_NAME + ,containerFactory = "customContainerFactory") + public void sendRoomCheck(String roomIdStr) { + log.info("开始执行房间检测: roomId={}",roomIdStr); + try { + Long roomId = Long.valueOf(roomIdStr); + JobResp resp = roomCheckJobService.checkRoom(roomId); + if(resp.isNextCreateJob()){ + // 1分钟后继续执行 + amqpProducer.sendRoomCheckDelay(roomIdStr,60); + } + }catch (Exception e){ + log.error("每分钟定时检测房间失败! roomId={}",roomIdStr); + } + + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/RoomSettleDelayMqConsumer.java similarity index 77% rename from ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java rename to ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/RoomSettleDelayMqConsumer.java index 77097bd7..4c675b2a 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/RoomSettleDelayMqConsumer.java @@ -1,23 +1,22 @@ package com.ruoyi.cai.mq.consumer; import com.ruoyi.cai.mq.AmqpProducer; -import com.ruoyi.cai.mq.CheckTimeOutMqConfig; +import com.ruoyi.cai.mq.config.RoomSettleDelayMqConfig; import com.ruoyi.cai.ws.service.SettleService; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.index.qual.SameLen; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component -public class CheckTimeOutMqConsumer { +public class RoomSettleDelayMqConsumer { @Autowired private SettleService settleService; @Autowired private AmqpProducer amqpProducer; - @RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME + @RabbitListener(queues = RoomSettleDelayMqConfig.QUEUE_NAME ,containerFactory = "customContainerFactory") public void checkTimeOutMq(String message) { log.info("开始执行预扣费: " + message); @@ -25,7 +24,7 @@ public class CheckTimeOutMqConsumer { boolean next = settleService.withholdingFee(Long.valueOf(message)); if(next){ // 1分钟后继续执行 - amqpProducer.sendCheckTimeOut(message,60); + amqpProducer.sendRoomSettleDelay(message,60); } }catch (Exception e){ log.error("每分钟定时扣费失败!",e); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java index 11e9e209..1ba3b6ed 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java @@ -1,11 +1,23 @@ package com.ruoyi.cai.mq.dto; +import com.ruoyi.cai.mq.CommonDelayTypeEnum; import lombok.Data; @Data public class CommonDelayDto { - // 1- 房间检测 2-黑名单检测 + /** + * @see com.ruoyi.cai.mq.CommonDelayTypeEnum + */ private Integer type; - private Long roomId; private Long forbidId; + + public CommonDelayTypeEnum getTypeEnum(){ + CommonDelayTypeEnum[] values = CommonDelayTypeEnum.values(); + for (CommonDelayTypeEnum value : values) { + if(value.getCode().equals(this.type)){ + return value; + } + } + return null; + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java index 4ae9bece..2c483c0c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java @@ -4,16 +4,12 @@ import com.ruoyi.cai.ws.constant.RedisConstant; import org.redisson.api.RSet; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.util.List; import java.util.Set; @Component public class OnlineDataCache { - @Autowired - private StringRedisTemplate redisTemplate; @Autowired private RedissonClient redissonClient; @@ -24,18 +20,15 @@ public class OnlineDataCache { public Set getAll(){ RSet set = redissonClient.getSet(getKey()); return set.readAll(); -// return redisTemplate.opsForSet().members(getKey()); } public void add(Long roomId){ RSet set = redissonClient.getSet(getKey()); set.add(String.valueOf(roomId)); -// redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId)); } public void remove(Long roomId) { RSet set = redissonClient.getSet(getKey()); - set.delete(); -// redisTemplate.opsForSet().remove(getKey(),roomId); + set.remove(roomId+""); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java index cb0b37e7..ef432fd6 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java @@ -44,7 +44,7 @@ public class AgreeMessageHandle extends AbstractMessageHandle implements IMessag .eq(UserCall::getId,room.getRoomId()) .set(UserCall::getStatus, RoomStatusEnums.STATUS_AGREE.getCode()) .set(UserCall::getBeginTime, LocalDateTime.now())); - amqpProducer.sendCheckTimeOut(room.getRoomId()+"",60); // 1分钟延时消息,开始扣钱 + amqpProducer.sendRoomSettleDelay(room.getRoomId()+"",60); // 1分钟延时消息,开始扣钱 String message = "提示:禁止任何涉黄、任何微信QQ引导到其它平台行为"; sendToAll(room.getRoomId(), WsRMsgGen.startVideo(room.getRoomId(),0L),WsRMsgGen.sysNotice(message)); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java deleted file mode 100644 index fc9859e0..00000000 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.ruoyi.cai.ws.job; - -import cn.hutool.core.date.DateUtil; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.ruoyi.cai.domain.UserCall; -import com.ruoyi.cai.service.UserCallService; -import com.ruoyi.cai.trd.ImDataRes; -import com.ruoyi.cai.trd.ImMsgGen; -import com.ruoyi.cai.ws.bean.Room; -import com.ruoyi.cai.ws.bean.UserData; -import com.ruoyi.cai.ws.cache.OnlineDataCache; -import com.ruoyi.cai.ws.constant.RoomStatusEnums; -import com.ruoyi.cai.ws.service.RoomService; -import com.ruoyi.cai.ws.service.SettleService; -import com.ruoyi.yunxin.Yunxin; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.Set; - -@Component -@Slf4j -public class CheckTimeOutJob { - @Autowired - private OnlineDataCache onlineDataCache; - @Autowired - private RoomService roomService; - @Autowired - private UserCallService userCallService; - @Autowired - private SettleService settleService; - @Autowired - private Yunxin yunxin; - - /** - * 检查房间是不是三分钟没有接听, 需要自动挂断掉 - */ -// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) - public void run(){ - Set roomIdStr = onlineDataCache.getAll(); - for (String roomIdS : roomIdStr) { - Long roomId = Long.valueOf(roomIdS); - this.deal(roomId); - } - } - - public void deal(Long roomId){ - log.info("开始执行房间超时检测,是否一直不接 roomId={}",roomId); - Room room = roomService.load(roomId); - if(room == null){ - return; - } - if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus()) - && !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){ - return; - } - UserData callUserData = room.getCallUserData(); - UserData receiverUserData = room.getReceiverUserData(); - // 检测是不是3分钟没有接听 - Long connectTimeCaller = callUserData.getConnectTime(); - Long connectTimeReceiver = receiverUserData.getConnectTime(); - boolean timeOut = false; - if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){ - timeOut = true; - }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){ - timeOut = true; - } - if(timeOut){ - if(roomService.hangUp(roomId)){ - userCallService.update(Wrappers.lambdaUpdate(UserCall.class) - .eq(UserCall::getId,roomId) - .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); - // TODO 需要放开主播的接听状态 - roomService.closeAllFd(roomId); - ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0); - yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes); - } - } - } -} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java deleted file mode 100644 index 756bfe51..00000000 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.ruoyi.cai.ws.job; - -import cn.hutool.core.date.DateUtil; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.ruoyi.cai.domain.UserCall; -import com.ruoyi.cai.service.UserCallService; -import com.ruoyi.cai.ws.bean.Room; -import com.ruoyi.cai.ws.bean.UserData; -import com.ruoyi.cai.ws.cache.OnlineDataCache; -import com.ruoyi.cai.ws.cache.RoomCtxCache; -import com.ruoyi.cai.ws.constant.HangUpEnums; -import com.ruoyi.cai.ws.constant.RoomStatusEnums; -import com.ruoyi.cai.ws.dto.WsR; -import com.ruoyi.cai.ws.dto.WsRMsgGen; -import com.ruoyi.cai.ws.service.RoomService; -import com.ruoyi.cai.ws.service.SettleService; -import com.ruoyi.cai.ws.util.RoomWebSocketUtil; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -@Component -@Slf4j -public class HeartbeatJob { - - @Autowired - private OnlineDataCache onlineDataCache; - @Autowired - private RoomService roomService; - @Autowired - private UserCallService userCallService; - @Autowired - private SettleService settleService; - @Autowired - private RoomCtxCache roomCtxCache; - - - /** - * 30秒执行一次 心跳只处理接听后的心跳 - */ - @Scheduled(fixedDelay = 30,timeUnit = TimeUnit.SECONDS) - public void run(){ - Set all = onlineDataCache.getAll(); - for (String roomIdS : all) { - try { - Long roomId = Long.valueOf(roomIdS); - Room room = roomService.load(roomId); - if(room == null){ - return; - } - if(!room.isCanCall()){ - return; - } - UserData callUserData = room.getCallUserData(); - UserData receiverUserData = room.getReceiverUserData(); - boolean timeOut = false; - WsR hangup = null; - if(isHeartTimeout(callUserData)){ - timeOut = true; - hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode()); - }else if(isHeartTimeout(receiverUserData)){ - timeOut = true; - hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode()); - } - if(timeOut){ - if(roomService.hangUp(roomId)){ - userCallService.update(Wrappers.lambdaUpdate(UserCall.class) - .eq(UserCall::getId,roomId) - .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); - settleService.processOn(roomId); - } - List keys = roomCtxCache.getSessionKeysByRoomId(roomId); - RoomWebSocketUtil.sendSendMessage(keys, hangup); - roomService.closeAllFd(roomId); - } - }catch (Exception e){ - log.error("定时心跳检测失败!",e); - } - - } - } - - /** - * 是否心跳超时 - * @param userData - * @return - */ - private boolean isHeartTimeout(UserData userData){ - if(userData.getConnectTime() == null){ - return false; - } - if(DateUtil.currentSeconds() - userData.getHeartTime() < 120){ - return false; - } - return true; - } -} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJob.java index ce55a1fe..485ae779 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJob.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJob.java @@ -1,158 +1,37 @@ package com.ruoyi.cai.ws.job; -import cn.hutool.core.date.DateUtil; -import com.baomidou.mybatisplus.core.toolkit.Wrappers; -import com.ruoyi.cai.domain.UserCall; -import com.ruoyi.cai.service.UserCallService; -import com.ruoyi.cai.trd.ImDataRes; -import com.ruoyi.cai.trd.ImMsgGen; -import com.ruoyi.cai.ws.bean.Room; -import com.ruoyi.cai.ws.bean.UserData; -import com.ruoyi.cai.ws.cache.RoomCtxCache; -import com.ruoyi.cai.ws.constant.HangUpEnums; -import com.ruoyi.cai.ws.constant.RoomStatusEnums; -import com.ruoyi.cai.ws.dto.WsR; -import com.ruoyi.cai.ws.dto.WsRMsgGen; +import com.ruoyi.cai.ws.cache.OnlineDataCache; import com.ruoyi.cai.ws.service.RoomService; -import com.ruoyi.cai.ws.service.SettleResp; import com.ruoyi.cai.ws.service.SettleService; -import com.ruoyi.cai.ws.util.RoomWebSocketUtil; -import com.ruoyi.yunxin.Yunxin; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; @Component +@Slf4j public class RoomCheckJob { - @Autowired - private RoomService roomService; + private OnlineDataCache onlineDataCache; @Autowired - private UserCallService userCallService; - @Autowired - private Yunxin yunxin; - @Autowired - private SettleService settleService; - @Autowired - private RoomCtxCache roomCtxCache; - - public JobResp checkRoom(Long roomId){ - Room room = roomService.load(roomId); - if(room == null){ - return JobResp.builder().nextCreateJob(false).build(); - } - // 检查是否三分钟没有接听 - JobResp resp = this.checkRoomCallerTimeOut(room); - if(!resp.isNextCreateJob()){ - return resp; - } - // 检查心跳 - JobResp heartbeat = checkRoomHeartbeat(room); - if(!heartbeat.isNextCreateJob()){ - return heartbeat; - } - this.checkCanCallTime(room); - // 尝试结算一下 - SettleResp settleResp = settleService.processOn(roomId); - return JobResp.builder().nextCreateJob(settleResp.isNextRun()).build(); - } - - - private void checkCanCallTime(Room room){ - Long time = roomService.canCallTime(room); - if(time < 150 && time > 60){ // 提示余额不足 - String sessionKey = roomCtxCache.getSessionKeyCallerByRoomId(room.getRoomId()); - RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.rechargeNotice("您的余额不足,点此充值")); - } - } - - private JobResp checkRoomHeartbeat(Room room){ - if(!room.isCanCall()){ - return JobResp.builder().nextCreateJob(true).build(); - } - Long roomId = room.getRoomId(); - UserData callUserData = room.getCallUserData(); - UserData receiverUserData = room.getReceiverUserData(); - boolean timeOut = false; - WsR hangup = null; - if(isHeartTimeout(callUserData)){ - timeOut = true; - hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode()); - }else if(isHeartTimeout(receiverUserData)){ - timeOut = true; - hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode()); - } - if(timeOut){ - boolean nextCreateJob = false; - if(roomService.hangUp(roomId)){ - userCallService.update(Wrappers.lambdaUpdate(UserCall.class) - .eq(UserCall::getId,roomId) - .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); - SettleResp settleResp = settleService.processOn(roomId); - nextCreateJob = settleResp.isNextRun(); - } - List keys = roomCtxCache.getSessionKeysByRoomId(roomId); - RoomWebSocketUtil.sendSendMessage(keys, hangup); - roomService.closeAllFd(roomId); - return JobResp.builder().nextCreateJob(nextCreateJob).build(); - } - return JobResp.builder().nextCreateJob(true).build(); - } - - - // 检测是不是3分钟没有接听 - private JobResp checkRoomCallerTimeOut(Room room){ - if(RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus()) - && RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())) { - return JobResp.builder().nextCreateJob(true).build(); - } - UserData callUserData = room.getCallUserData(); - UserData receiverUserData = room.getReceiverUserData(); - Long connectTimeCaller = callUserData.getConnectTime(); - Long connectTimeReceiver = receiverUserData.getConnectTime(); - boolean timeOut = false; - if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){ - timeOut = true; - }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){ - timeOut = true; - } - if(timeOut){ - Long roomId = room.getRoomId(); - boolean nextCreateJob = false; - if(roomService.hangUp(roomId)){ - userCallService.update(Wrappers.lambdaUpdate(UserCall.class) - .eq(UserCall::getId,roomId) - .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); - SettleResp settleResp = settleService.processOn(roomId); - nextCreateJob = settleResp.isNextRun(); - } - roomService.closeAllFd(roomId); - ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0); - yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes); - return JobResp.builder().nextCreateJob(nextCreateJob).build(); - } - return JobResp.builder().nextCreateJob(true).build(); - } - + private RoomCheckJobService roomCheckJobService; /** - * 是否心跳超时 - * @param userData - * @return + * 每 7分钟执行一次 */ - private boolean isHeartTimeout(UserData userData){ - if(userData.getConnectTime() == null){ - return false; + @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES) + public void run(){ + Set all = onlineDataCache.getAll(); + for (String roomIdS : all) { + try { + Long roomId = Long.valueOf(roomIdS); + roomCheckJobService.checkRoom(roomId); + }catch (Exception e){ + log.info("定时检测房间失败!需要开发排查问题 roomId={}", roomIdS, e); + } } - if(DateUtil.currentSeconds() - userData.getHeartTime() < 117){ - return false; - } - return true; } - - - - - } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJobService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJobService.java new file mode 100644 index 00000000..5647de51 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJobService.java @@ -0,0 +1,158 @@ +package com.ruoyi.cai.ws.job; + +import cn.hutool.core.date.DateUtil; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.ruoyi.cai.domain.UserCall; +import com.ruoyi.cai.service.UserCallService; +import com.ruoyi.cai.trd.ImDataRes; +import com.ruoyi.cai.trd.ImMsgGen; +import com.ruoyi.cai.ws.bean.Room; +import com.ruoyi.cai.ws.bean.UserData; +import com.ruoyi.cai.ws.cache.RoomCtxCache; +import com.ruoyi.cai.ws.constant.HangUpEnums; +import com.ruoyi.cai.ws.constant.RoomStatusEnums; +import com.ruoyi.cai.ws.dto.WsR; +import com.ruoyi.cai.ws.dto.WsRMsgGen; +import com.ruoyi.cai.ws.service.RoomService; +import com.ruoyi.cai.ws.service.SettleResp; +import com.ruoyi.cai.ws.service.SettleService; +import com.ruoyi.cai.ws.util.RoomWebSocketUtil; +import com.ruoyi.yunxin.Yunxin; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class RoomCheckJobService { + + @Autowired + private RoomService roomService; + @Autowired + private UserCallService userCallService; + @Autowired + private Yunxin yunxin; + @Autowired + private SettleService settleService; + @Autowired + private RoomCtxCache roomCtxCache; + + public JobResp checkRoom(Long roomId){ + Room room = roomService.load(roomId); + if(room == null){ + return JobResp.builder().nextCreateJob(false).build(); + } + // 检查是否三分钟没有接听 + JobResp resp = this.checkRoomCallerTimeOut(room); + if(!resp.isNextCreateJob()){ + return resp; + } + // 检查心跳 + JobResp heartbeat = checkRoomHeartbeat(room); + if(!heartbeat.isNextCreateJob()){ + return heartbeat; + } + this.checkCanCallTime(room); + // 尝试结算一下 + SettleResp settleResp = settleService.processOn(roomId); + return JobResp.builder().nextCreateJob(settleResp.isNextRun()).build(); + } + + + private void checkCanCallTime(Room room){ + Long time = roomService.canCallTime(room); + if(time < 150 && time > 60){ // 提示余额不足 + String sessionKey = roomCtxCache.getSessionKeyCallerByRoomId(room.getRoomId()); + RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.rechargeNotice("您的余额不足,点此充值")); + } + } + + private JobResp checkRoomHeartbeat(Room room){ + if(!room.isCanCall()){ + return JobResp.builder().nextCreateJob(true).build(); + } + Long roomId = room.getRoomId(); + UserData callUserData = room.getCallUserData(); + UserData receiverUserData = room.getReceiverUserData(); + boolean timeOut = false; + WsR hangup = null; + if(isHeartTimeout(callUserData)){ + timeOut = true; + hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode()); + }else if(isHeartTimeout(receiverUserData)){ + timeOut = true; + hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode()); + } + if(timeOut){ + boolean nextCreateJob = false; + if(roomService.hangUp(roomId)){ + userCallService.update(Wrappers.lambdaUpdate(UserCall.class) + .eq(UserCall::getId,roomId) + .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); + SettleResp settleResp = settleService.processOn(roomId); + nextCreateJob = settleResp.isNextRun(); + } + List keys = roomCtxCache.getSessionKeysByRoomId(roomId); + RoomWebSocketUtil.sendSendMessage(keys, hangup); + roomService.closeAllFd(roomId); + return JobResp.builder().nextCreateJob(nextCreateJob).build(); + } + return JobResp.builder().nextCreateJob(true).build(); + } + + + // 检测是不是3分钟没有接听 + private JobResp checkRoomCallerTimeOut(Room room){ + if(RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus()) + && RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())) { + return JobResp.builder().nextCreateJob(true).build(); + } + UserData callUserData = room.getCallUserData(); + UserData receiverUserData = room.getReceiverUserData(); + Long connectTimeCaller = callUserData.getConnectTime(); + Long connectTimeReceiver = receiverUserData.getConnectTime(); + boolean timeOut = false; + if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){ + timeOut = true; + }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){ + timeOut = true; + } + if(timeOut){ + Long roomId = room.getRoomId(); + boolean nextCreateJob = false; + if(roomService.hangUp(roomId)){ + userCallService.update(Wrappers.lambdaUpdate(UserCall.class) + .eq(UserCall::getId,roomId) + .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); + SettleResp settleResp = settleService.processOn(roomId); + nextCreateJob = settleResp.isNextRun(); + } + roomService.closeAllFd(roomId); + ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0); + yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes); + return JobResp.builder().nextCreateJob(nextCreateJob).build(); + } + return JobResp.builder().nextCreateJob(true).build(); + } + + + /** + * 是否心跳超时 + * @param userData + * @return + */ + private boolean isHeartTimeout(UserData userData){ + if(userData.getConnectTime() == null){ + return false; + } + if(DateUtil.currentSeconds() - userData.getHeartTime() < 117){ + return false; + } + return true; + } + + + + + +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java deleted file mode 100644 index 69f0a78a..00000000 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.ruoyi.cai.ws.job; - -import com.ruoyi.cai.ws.bean.Room; -import com.ruoyi.cai.ws.cache.OnlineDataCache; -import com.ruoyi.cai.ws.service.RoomService; -import com.ruoyi.cai.ws.service.SettleService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - -@Component -@Slf4j -public class SettleJob { - @Autowired - private OnlineDataCache onlineDataCache; - @Autowired - private SettleService settleService; - @Autowired - private RoomService roomService; - - /** - * 每 1 分钟执行一次 - */ -// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) - public void run(){ - Set all = onlineDataCache.getAll(); - for (String roomIdS : all) { - try { - Long roomId = Long.valueOf(roomIdS); - settleService.processOn(roomId); - }catch (Exception e){ - log.info("定时任务结算失败!",e); - } - } - } -} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java index 00d7f409..b32fb6db 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java @@ -122,8 +122,8 @@ public class OpenLogic { userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_CALLER,map); roomDataCache.setStatus(room.getRoomId(),RoomStatusEnums.STATUS_CALLER_CONNECT); onlineDataCache.add(room.getRoomId()); + amqpProducer.sendRoomCheckDelay(room.getRoomId()+"",60); userService.updateVideoStatus(userId,1); - amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182); isFirst = true; } // 已经接通 @@ -169,7 +169,6 @@ public class OpenLogic { map.put("connectTime", DateUtil.currentSeconds()); map.put("heartTime",DateUtil.currentSeconds()); userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_RECEIVER,map); - amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182); // 房间号状态设置为 接收方已连接 boolean res = roomDataCache.setStatusReceiverConnection(room.getRoomId()); if(!res){