This commit is contained in:
张良(004796)
2024-01-25 17:16:23 +08:00
parent 31036e69af
commit 7874e38d17
19 changed files with 326 additions and 415 deletions

View File

@@ -19,7 +19,7 @@ public class MqControllerTest {
@GetMapping("/send") @GetMapping("/send")
public void send(String message,Integer time){ public void send(String message,Integer time){
amqpProducer.sendCheckTimeOut(message,time); amqpProducer.sendRoomSettleDelay(message,time);
} }
@GetMapping("/send2") @GetMapping("/send2")

View File

@@ -1,6 +1,9 @@
package com.ruoyi.cai.mq; package com.ruoyi.cai.mq;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.ruoyi.cai.mq.config.RoomCheckDelayMqConfig;
import com.ruoyi.cai.mq.config.RoomSettleDelayMqConfig;
import com.ruoyi.cai.mq.config.CommonDelayMqConfig;
import com.ruoyi.cai.mq.consumer.CalculateSalesQueueConsumer; import com.ruoyi.cai.mq.consumer.CalculateSalesQueueConsumer;
import com.ruoyi.cai.mq.consumer.CommonConsumer; import com.ruoyi.cai.mq.consumer.CommonConsumer;
import com.ruoyi.cai.mq.dto.CommonDTO; import com.ruoyi.cai.mq.dto.CommonDTO;
@@ -24,9 +27,19 @@ public class AmqpProducer {
CommonConsumer.COMMON_KEY, JSON.toJSONString(dto)); CommonConsumer.COMMON_KEY, JSON.toJSONString(dto));
} }
public void sendCheckTimeOut(String message,Integer timeout){ public void sendRoomCheckDelay(String message, Integer timeout){
rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME, rabbitTemplate.convertAndSend(RoomCheckDelayMqConfig.EXCHANGE_NAME,
CheckTimeOutMqConfig.ROUTING_KEY, RoomCheckDelayMqConfig.ROUTING_KEY,
message,
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒
return messagePostProcessor;
});
}
public void sendRoomSettleDelay(String message, Integer timeout){
rabbitTemplate.convertAndSend(RoomSettleDelayMqConfig.EXCHANGE_NAME,
RoomSettleDelayMqConfig.ROUTING_KEY,
message, message,
messagePostProcessor -> { messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒
@@ -43,17 +56,4 @@ public class AmqpProducer {
return messagePostProcessor; return messagePostProcessor;
}); });
} }
public void sendCommonDelayMq(Integer type,Long roomId,Integer timeout){
CommonDelayDto dto = new CommonDelayDto();
dto.setType(type);
dto.setRoomId(roomId);
rabbitTemplate.convertAndSend(CommonDelayMqConfig.EXCHANGE_NAME,
CommonDelayMqConfig.ROUTING_KEY,
JSON.toJSONString(dto),
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒
return messagePostProcessor;
});
}
} }

View File

@@ -0,0 +1,14 @@
package com.ruoyi.cai.mq;
import lombok.Getter;
@Getter
public enum CommonDelayTypeEnum {
USER_FORBID(1),
;
private final Integer code;
CommonDelayTypeEnum(Integer code) {
this.code = code;
}
}

View File

@@ -1,4 +1,4 @@
package com.ruoyi.cai.mq; package com.ruoyi.cai.mq.config;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;

View File

@@ -1,4 +1,4 @@
package com.ruoyi.cai.mq; package com.ruoyi.cai.mq.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;

View File

@@ -1,4 +1,4 @@
package com.ruoyi.cai.mq; package com.ruoyi.cai.mq.config;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@@ -8,11 +8,11 @@ import java.util.HashMap;
@Configuration @Configuration
public class CheckTimeOutMqConfig { public class RoomCheckDelayMqConfig {
public static final String EXCHANGE_NAME = "checkRoomTimeExchange"; public static final String EXCHANGE_NAME = "roomCheckDelayExchange";
public static final String QUEUE_NAME = "checkRoomTimeQueue"; public static final String QUEUE_NAME = "roomCheckDelayQueue";
public static final String ROUTING_KEY = "checkRoomTimeRouting"; public static final String ROUTING_KEY = "roomCheckDelayRouting";
@Bean @Bean
public CustomExchange delayedExchange() { public CustomExchange delayedExchange() {

View File

@@ -0,0 +1,40 @@
package com.ruoyi.cai.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class RoomSettleDelayMqConfig {
public static final String EXCHANGE_NAME = "roomSettleDelayExchange";
public static final String QUEUE_NAME = "roomSettleDelayQueue";
public static final String ROUTING_KEY = "roomSettleDelayRouting";
@Bean
public CustomExchange delayedExchange() {
HashMap<String,Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME,
"x-delayed-message", // 消息类型
true, // 是否持久化
false,// 是否自动删除
args);
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.withArgument("x-delayed-type", "direct")
.build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
}
}

View File

@@ -2,9 +2,9 @@ package com.ruoyi.cai.mq.consumer;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.manager.UserForbidManager; import com.ruoyi.cai.manager.UserForbidManager;
import com.ruoyi.cai.mq.CommonDelayTypeEnum;
import com.ruoyi.cai.mq.config.CommonDelayMqConfig;
import com.ruoyi.cai.mq.dto.CommonDelayDto; import com.ruoyi.cai.mq.dto.CommonDelayDto;
import com.ruoyi.cai.mq.CommonDelayMqConfig;
import com.ruoyi.cai.ws.job.CheckTimeOutJob;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -13,8 +13,6 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
public class CommonDelayMqConsumer { public class CommonDelayMqConsumer {
@Autowired
private CheckTimeOutJob checkTimeOutJob;
@Autowired @Autowired
private UserForbidManager userForbidManager; private UserForbidManager userForbidManager;
@@ -23,12 +21,18 @@ public class CommonDelayMqConsumer {
public void checkTimeOutMq(String message) { public void checkTimeOutMq(String message) {
log.info("CommonDelayMqConsumer: " + message); log.info("CommonDelayMqConsumer: " + message);
CommonDelayDto dto = JSON.parseObject(message, CommonDelayDto.class); CommonDelayDto dto = JSON.parseObject(message, CommonDelayDto.class);
switch (dto.getType()){ CommonDelayTypeEnum typeEnum = dto.getTypeEnum();
case 1: if(typeEnum == null){
checkTimeOutJob.deal(dto.getRoomId()); log.warn("延时任务执行失败,未检测到正确的类型 dto={}",JSON.toJSONString(dto));
break; return;
case 2: }
userForbidManager.checkAll(); switch (typeEnum){
case USER_FORBID:
try {
userForbidManager.checkExpire(dto.getForbidId());
}catch (Exception e){
log.error("检查用户封禁状态失败!需要开发确认!",e);
}
break; break;
default: default:
break; break;

View File

@@ -0,0 +1,36 @@
package com.ruoyi.cai.mq.consumer;
import com.ruoyi.cai.mq.AmqpProducer;
import com.ruoyi.cai.mq.config.RoomCheckDelayMqConfig;
import com.ruoyi.cai.ws.job.JobResp;
import com.ruoyi.cai.ws.job.RoomCheckJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RoomCheckDelayMqConsumer {
@Autowired
private AmqpProducer amqpProducer;
@Autowired
private RoomCheckJobService roomCheckJobService;
@RabbitListener(queues = RoomCheckDelayMqConfig.QUEUE_NAME
,containerFactory = "customContainerFactory")
public void sendRoomCheck(String roomIdStr) {
log.info("开始执行房间检测: roomId={}",roomIdStr);
try {
Long roomId = Long.valueOf(roomIdStr);
JobResp resp = roomCheckJobService.checkRoom(roomId);
if(resp.isNextCreateJob()){
// 1分钟后继续执行
amqpProducer.sendRoomCheckDelay(roomIdStr,60);
}
}catch (Exception e){
log.error("每分钟定时检测房间失败! roomId={}",roomIdStr);
}
}
}

View File

@@ -1,23 +1,22 @@
package com.ruoyi.cai.mq.consumer; package com.ruoyi.cai.mq.consumer;
import com.ruoyi.cai.mq.AmqpProducer; import com.ruoyi.cai.mq.AmqpProducer;
import com.ruoyi.cai.mq.CheckTimeOutMqConfig; import com.ruoyi.cai.mq.config.RoomSettleDelayMqConfig;
import com.ruoyi.cai.ws.service.SettleService; import com.ruoyi.cai.ws.service.SettleService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.index.qual.SameLen;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
public class CheckTimeOutMqConsumer { public class RoomSettleDelayMqConsumer {
@Autowired @Autowired
private SettleService settleService; private SettleService settleService;
@Autowired @Autowired
private AmqpProducer amqpProducer; private AmqpProducer amqpProducer;
@RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME @RabbitListener(queues = RoomSettleDelayMqConfig.QUEUE_NAME
,containerFactory = "customContainerFactory") ,containerFactory = "customContainerFactory")
public void checkTimeOutMq(String message) { public void checkTimeOutMq(String message) {
log.info("开始执行预扣费: " + message); log.info("开始执行预扣费: " + message);
@@ -25,7 +24,7 @@ public class CheckTimeOutMqConsumer {
boolean next = settleService.withholdingFee(Long.valueOf(message)); boolean next = settleService.withholdingFee(Long.valueOf(message));
if(next){ if(next){
// 1分钟后继续执行 // 1分钟后继续执行
amqpProducer.sendCheckTimeOut(message,60); amqpProducer.sendRoomSettleDelay(message,60);
} }
}catch (Exception e){ }catch (Exception e){
log.error("每分钟定时扣费失败!",e); log.error("每分钟定时扣费失败!",e);

View File

@@ -1,11 +1,23 @@
package com.ruoyi.cai.mq.dto; package com.ruoyi.cai.mq.dto;
import com.ruoyi.cai.mq.CommonDelayTypeEnum;
import lombok.Data; import lombok.Data;
@Data @Data
public class CommonDelayDto { public class CommonDelayDto {
// 1- 房间检测 2-黑名单检测 /**
* @see com.ruoyi.cai.mq.CommonDelayTypeEnum
*/
private Integer type; private Integer type;
private Long roomId;
private Long forbidId; private Long forbidId;
public CommonDelayTypeEnum getTypeEnum(){
CommonDelayTypeEnum[] values = CommonDelayTypeEnum.values();
for (CommonDelayTypeEnum value : values) {
if(value.getCode().equals(this.type)){
return value;
}
}
return null;
}
} }

View File

@@ -4,16 +4,12 @@ import com.ruoyi.cai.ws.constant.RedisConstant;
import org.redisson.api.RSet; import org.redisson.api.RSet;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set; import java.util.Set;
@Component @Component
public class OnlineDataCache { public class OnlineDataCache {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired @Autowired
private RedissonClient redissonClient; private RedissonClient redissonClient;
@@ -24,18 +20,15 @@ public class OnlineDataCache {
public Set<String> getAll(){ public Set<String> getAll(){
RSet<String> set = redissonClient.getSet(getKey()); RSet<String> set = redissonClient.getSet(getKey());
return set.readAll(); return set.readAll();
// return redisTemplate.opsForSet().members(getKey());
} }
public void add(Long roomId){ public void add(Long roomId){
RSet<String> set = redissonClient.getSet(getKey()); RSet<String> set = redissonClient.getSet(getKey());
set.add(String.valueOf(roomId)); set.add(String.valueOf(roomId));
// redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId));
} }
public void remove(Long roomId) { public void remove(Long roomId) {
RSet<String> set = redissonClient.getSet(getKey()); RSet<String> set = redissonClient.getSet(getKey());
set.delete(); set.remove(roomId+"");
// redisTemplate.opsForSet().remove(getKey(),roomId);
} }
} }

View File

@@ -44,7 +44,7 @@ public class AgreeMessageHandle extends AbstractMessageHandle implements IMessag
.eq(UserCall::getId,room.getRoomId()) .eq(UserCall::getId,room.getRoomId())
.set(UserCall::getStatus, RoomStatusEnums.STATUS_AGREE.getCode()) .set(UserCall::getStatus, RoomStatusEnums.STATUS_AGREE.getCode())
.set(UserCall::getBeginTime, LocalDateTime.now())); .set(UserCall::getBeginTime, LocalDateTime.now()));
amqpProducer.sendCheckTimeOut(room.getRoomId()+"",60); // 1分钟延时消息开始扣钱 amqpProducer.sendRoomSettleDelay(room.getRoomId()+"",60); // 1分钟延时消息开始扣钱
String message = "提示禁止任何涉黄、任何微信QQ引导到其它平台行为"; String message = "提示禁止任何涉黄、任何微信QQ引导到其它平台行为";
sendToAll(room.getRoomId(), WsRMsgGen.startVideo(room.getRoomId(),0L),WsRMsgGen.sysNotice(message)); sendToAll(room.getRoomId(), WsRMsgGen.startVideo(room.getRoomId(),0L),WsRMsgGen.sysNotice(message));
} }

View File

@@ -1,81 +0,0 @@
package com.ruoyi.cai.ws.job;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import com.ruoyi.yunxin.Yunxin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Set;
@Component
@Slf4j
public class CheckTimeOutJob {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private RoomService roomService;
@Autowired
private UserCallService userCallService;
@Autowired
private SettleService settleService;
@Autowired
private Yunxin yunxin;
/**
* 检查房间是不是三分钟没有接听, 需要自动挂断掉
*/
// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> roomIdStr = onlineDataCache.getAll();
for (String roomIdS : roomIdStr) {
Long roomId = Long.valueOf(roomIdS);
this.deal(roomId);
}
}
public void deal(Long roomId){
log.info("开始执行房间超时检测,是否一直不接 roomId={}",roomId);
Room room = roomService.load(roomId);
if(room == null){
return;
}
if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
&& !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
}
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
// 检测是不是3分钟没有接听
Long connectTimeCaller = callUserData.getConnectTime();
Long connectTimeReceiver = receiverUserData.getConnectTime();
boolean timeOut = false;
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
timeOut = true;
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
timeOut = true;
}
if(timeOut){
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
// TODO 需要放开主播的接听状态
roomService.closeAllFd(roomId);
ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0);
yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes);
}
}
}
}

View File

@@ -1,102 +0,0 @@
package com.ruoyi.cai.ws.job;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsR;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class HeartbeatJob {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private RoomService roomService;
@Autowired
private UserCallService userCallService;
@Autowired
private SettleService settleService;
@Autowired
private RoomCtxCache roomCtxCache;
/**
* 30秒执行一次 心跳只处理接听后的心跳
*/
@Scheduled(fixedDelay = 30,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
Room room = roomService.load(roomId);
if(room == null){
return;
}
if(!room.isCanCall()){
return;
}
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
boolean timeOut = false;
WsR hangup = null;
if(isHeartTimeout(callUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
}else if(isHeartTimeout(receiverUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
}
if(timeOut){
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
settleService.processOn(roomId);
}
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, hangup);
roomService.closeAllFd(roomId);
}
}catch (Exception e){
log.error("定时心跳检测失败!",e);
}
}
}
/**
* 是否心跳超时
* @param userData
* @return
*/
private boolean isHeartTimeout(UserData userData){
if(userData.getConnectTime() == null){
return false;
}
if(DateUtil.currentSeconds() - userData.getHeartTime() < 120){
return false;
}
return true;
}
}

View File

@@ -1,158 +1,37 @@
package com.ruoyi.cai.ws.job; package com.ruoyi.cai.ws.job;
import cn.hutool.core.date.DateUtil; import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsR;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.service.RoomService; import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleResp;
import com.ruoyi.cai.ws.service.SettleService; import com.ruoyi.cai.ws.service.SettleService;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil; import lombok.extern.slf4j.Slf4j;
import com.ruoyi.yunxin.Yunxin;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component @Component
@Slf4j
public class RoomCheckJob { public class RoomCheckJob {
@Autowired @Autowired
private RoomService roomService; private OnlineDataCache onlineDataCache;
@Autowired @Autowired
private UserCallService userCallService; private RoomCheckJobService roomCheckJobService;
@Autowired
private Yunxin yunxin;
@Autowired
private SettleService settleService;
@Autowired
private RoomCtxCache roomCtxCache;
public JobResp checkRoom(Long roomId){
Room room = roomService.load(roomId);
if(room == null){
return JobResp.builder().nextCreateJob(false).build();
}
// 检查是否三分钟没有接听
JobResp resp = this.checkRoomCallerTimeOut(room);
if(!resp.isNextCreateJob()){
return resp;
}
// 检查心跳
JobResp heartbeat = checkRoomHeartbeat(room);
if(!heartbeat.isNextCreateJob()){
return heartbeat;
}
this.checkCanCallTime(room);
// 尝试结算一下
SettleResp settleResp = settleService.processOn(roomId);
return JobResp.builder().nextCreateJob(settleResp.isNextRun()).build();
}
private void checkCanCallTime(Room room){
Long time = roomService.canCallTime(room);
if(time < 150 && time > 60){ // 提示余额不足
String sessionKey = roomCtxCache.getSessionKeyCallerByRoomId(room.getRoomId());
RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.rechargeNotice("您的余额不足,点此充值"));
}
}
private JobResp checkRoomHeartbeat(Room room){
if(!room.isCanCall()){
return JobResp.builder().nextCreateJob(true).build();
}
Long roomId = room.getRoomId();
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
boolean timeOut = false;
WsR hangup = null;
if(isHeartTimeout(callUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
}else if(isHeartTimeout(receiverUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
}
if(timeOut){
boolean nextCreateJob = false;
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
SettleResp settleResp = settleService.processOn(roomId);
nextCreateJob = settleResp.isNextRun();
}
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, hangup);
roomService.closeAllFd(roomId);
return JobResp.builder().nextCreateJob(nextCreateJob).build();
}
return JobResp.builder().nextCreateJob(true).build();
}
// 检测是不是3分钟没有接听
private JobResp checkRoomCallerTimeOut(Room room){
if(RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
&& RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())) {
return JobResp.builder().nextCreateJob(true).build();
}
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
Long connectTimeCaller = callUserData.getConnectTime();
Long connectTimeReceiver = receiverUserData.getConnectTime();
boolean timeOut = false;
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
timeOut = true;
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
timeOut = true;
}
if(timeOut){
Long roomId = room.getRoomId();
boolean nextCreateJob = false;
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
SettleResp settleResp = settleService.processOn(roomId);
nextCreateJob = settleResp.isNextRun();
}
roomService.closeAllFd(roomId);
ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0);
yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes);
return JobResp.builder().nextCreateJob(nextCreateJob).build();
}
return JobResp.builder().nextCreateJob(true).build();
}
/** /**
* 是否心跳超时 * 每 7分钟执行一次
* @param userData
* @return
*/ */
private boolean isHeartTimeout(UserData userData){ @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES)
if(userData.getConnectTime() == null){ public void run(){
return false; Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
roomCheckJobService.checkRoom(roomId);
}catch (Exception e){
log.info("定时检测房间失败!需要开发排查问题 roomId={}", roomIdS, e);
} }
if(DateUtil.currentSeconds() - userData.getHeartTime() < 117){
return false;
} }
return true;
} }
} }

View File

@@ -0,0 +1,158 @@
package com.ruoyi.cai.ws.job;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsR;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleResp;
import com.ruoyi.cai.ws.service.SettleService;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
import com.ruoyi.yunxin.Yunxin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class RoomCheckJobService {
@Autowired
private RoomService roomService;
@Autowired
private UserCallService userCallService;
@Autowired
private Yunxin yunxin;
@Autowired
private SettleService settleService;
@Autowired
private RoomCtxCache roomCtxCache;
public JobResp checkRoom(Long roomId){
Room room = roomService.load(roomId);
if(room == null){
return JobResp.builder().nextCreateJob(false).build();
}
// 检查是否三分钟没有接听
JobResp resp = this.checkRoomCallerTimeOut(room);
if(!resp.isNextCreateJob()){
return resp;
}
// 检查心跳
JobResp heartbeat = checkRoomHeartbeat(room);
if(!heartbeat.isNextCreateJob()){
return heartbeat;
}
this.checkCanCallTime(room);
// 尝试结算一下
SettleResp settleResp = settleService.processOn(roomId);
return JobResp.builder().nextCreateJob(settleResp.isNextRun()).build();
}
private void checkCanCallTime(Room room){
Long time = roomService.canCallTime(room);
if(time < 150 && time > 60){ // 提示余额不足
String sessionKey = roomCtxCache.getSessionKeyCallerByRoomId(room.getRoomId());
RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.rechargeNotice("您的余额不足,点此充值"));
}
}
private JobResp checkRoomHeartbeat(Room room){
if(!room.isCanCall()){
return JobResp.builder().nextCreateJob(true).build();
}
Long roomId = room.getRoomId();
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
boolean timeOut = false;
WsR hangup = null;
if(isHeartTimeout(callUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
}else if(isHeartTimeout(receiverUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
}
if(timeOut){
boolean nextCreateJob = false;
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
SettleResp settleResp = settleService.processOn(roomId);
nextCreateJob = settleResp.isNextRun();
}
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, hangup);
roomService.closeAllFd(roomId);
return JobResp.builder().nextCreateJob(nextCreateJob).build();
}
return JobResp.builder().nextCreateJob(true).build();
}
// 检测是不是3分钟没有接听
private JobResp checkRoomCallerTimeOut(Room room){
if(RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
&& RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())) {
return JobResp.builder().nextCreateJob(true).build();
}
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
Long connectTimeCaller = callUserData.getConnectTime();
Long connectTimeReceiver = receiverUserData.getConnectTime();
boolean timeOut = false;
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
timeOut = true;
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
timeOut = true;
}
if(timeOut){
Long roomId = room.getRoomId();
boolean nextCreateJob = false;
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
SettleResp settleResp = settleService.processOn(roomId);
nextCreateJob = settleResp.isNextRun();
}
roomService.closeAllFd(roomId);
ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0);
yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes);
return JobResp.builder().nextCreateJob(nextCreateJob).build();
}
return JobResp.builder().nextCreateJob(true).build();
}
/**
* 是否心跳超时
* @param userData
* @return
*/
private boolean isHeartTimeout(UserData userData){
if(userData.getConnectTime() == null){
return false;
}
if(DateUtil.currentSeconds() - userData.getHeartTime() < 117){
return false;
}
return true;
}
}

View File

@@ -1,40 +0,0 @@
package com.ruoyi.cai.ws.job;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class SettleJob {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private SettleService settleService;
@Autowired
private RoomService roomService;
/**
* 每 1 分钟执行一次
*/
// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
settleService.processOn(roomId);
}catch (Exception e){
log.info("定时任务结算失败!",e);
}
}
}
}

View File

@@ -122,8 +122,8 @@ public class OpenLogic {
userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_CALLER,map); userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_CALLER,map);
roomDataCache.setStatus(room.getRoomId(),RoomStatusEnums.STATUS_CALLER_CONNECT); roomDataCache.setStatus(room.getRoomId(),RoomStatusEnums.STATUS_CALLER_CONNECT);
onlineDataCache.add(room.getRoomId()); onlineDataCache.add(room.getRoomId());
amqpProducer.sendRoomCheckDelay(room.getRoomId()+"",60);
userService.updateVideoStatus(userId,1); userService.updateVideoStatus(userId,1);
amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182);
isFirst = true; isFirst = true;
} }
// 已经接通 // 已经接通
@@ -169,7 +169,6 @@ public class OpenLogic {
map.put("connectTime", DateUtil.currentSeconds()); map.put("connectTime", DateUtil.currentSeconds());
map.put("heartTime",DateUtil.currentSeconds()); map.put("heartTime",DateUtil.currentSeconds());
userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_RECEIVER,map); userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_RECEIVER,map);
amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182);
// 房间号状态设置为 接收方已连接 // 房间号状态设置为 接收方已连接
boolean res = roomDataCache.setStatusReceiverConnection(room.getRoomId()); boolean res = roomDataCache.setStatusReceiverConnection(room.getRoomId());
if(!res){ if(!res){