123
This commit is contained in:
@@ -0,0 +1,39 @@
|
||||
package com.ruoyi.consumer;
|
||||
|
||||
import com.ruoyi.cai.mq.AmqpWsProducer;
|
||||
import com.ruoyi.cai.mq.constant.ws.RoomCheckDelayWsMqConstant;
|
||||
import com.ruoyi.cai.ws.job.JobResp;
|
||||
import com.ruoyi.job.RoomCheckJobService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RoomCheckDelayMqConsumer {
|
||||
@Autowired
|
||||
private AmqpWsProducer amqpWsProducer;
|
||||
@Autowired
|
||||
private RoomCheckJobService roomCheckJobService;
|
||||
|
||||
@RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME
|
||||
,containerFactory = "customContainerFactory")
|
||||
public void sendRoomCheck(String roomIdStr) {
|
||||
log.info("开始执行房间检测: roomId={}",roomIdStr);
|
||||
try {
|
||||
Long roomId = Long.valueOf(roomIdStr);
|
||||
JobResp resp = roomCheckJobService.checkRoom(roomId);
|
||||
if(resp.isNextCreateJob()){
|
||||
// 1分钟后继续执行
|
||||
log.info("1分钟后继续执行房间检测: roomId={}",roomIdStr);
|
||||
amqpWsProducer.sendRoomCheckDelay(roomIdStr,60);
|
||||
}
|
||||
}catch (Exception e){
|
||||
log.error("每分钟定时检测房间失败! roomId={}",roomIdStr);
|
||||
}finally {
|
||||
log.info("结束执行房间检测: roomId={}",roomIdStr);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.ruoyi.consumer;
|
||||
|
||||
import com.ruoyi.cai.mq.AmqpWsProducer;
|
||||
import com.ruoyi.cai.mq.constant.ws.RoomSettleDelayWsMqConstant;
|
||||
import com.ruoyi.cai.ws.service.SettleService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RoomSettleDelayMqConsumer {
|
||||
@Autowired
|
||||
private SettleService settleService;
|
||||
@Autowired
|
||||
private AmqpWsProducer amqpWsProducer;
|
||||
|
||||
@RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME
|
||||
,containerFactory = "customContainerFactory")
|
||||
public void checkTimeOutMq(String message) {
|
||||
log.info("开始执行预扣费: " + message);
|
||||
try {
|
||||
boolean next = settleService.withholdingFee(Long.valueOf(message));
|
||||
if(next){
|
||||
// 1分钟后继续执行
|
||||
amqpWsProducer.sendRoomSettleDelay(message,60);
|
||||
}
|
||||
}catch (Exception e){
|
||||
log.error("每分钟定时扣费失败!",e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.ruoyi.job;
|
||||
|
||||
import com.ruoyi.cai.ws.cache.OnlineDataCache;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RoomCheckJob {
|
||||
@Autowired
|
||||
private OnlineDataCache onlineDataCache;
|
||||
@Autowired
|
||||
private RoomCheckJobService roomCheckJobService;
|
||||
|
||||
/**
|
||||
* 每 7分钟执行一次
|
||||
*/
|
||||
@Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES)
|
||||
public void run(){
|
||||
Set<String> all = onlineDataCache.getAll();
|
||||
for (String roomIdS : all) {
|
||||
try {
|
||||
Long roomId = Long.valueOf(roomIdS);
|
||||
roomCheckJobService.checkRoom(roomId);
|
||||
}catch (Exception e){
|
||||
log.info("定时检测房间失败!需要开发排查问题 roomId={}", roomIdS, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,176 @@
|
||||
package com.ruoyi.job;
|
||||
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.ruoyi.cai.domain.UserCall;
|
||||
import com.ruoyi.cai.notice.YunxinWsServiceV2;
|
||||
import com.ruoyi.cai.service.UserCallService;
|
||||
import com.ruoyi.cai.ws.bean.Room;
|
||||
import com.ruoyi.cai.ws.bean.UserData;
|
||||
import com.ruoyi.cai.ws.cache.RoomCtxCache;
|
||||
import com.ruoyi.cai.ws.constant.HangUpEnums;
|
||||
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
|
||||
import com.ruoyi.cai.ws.dto.WsR;
|
||||
import com.ruoyi.cai.ws.dto.WsRMsgGen;
|
||||
import com.ruoyi.cai.ws.job.JobResp;
|
||||
import com.ruoyi.cai.ws.service.RoomService;
|
||||
import com.ruoyi.cai.ws.service.SettleResp;
|
||||
import com.ruoyi.cai.ws.service.SettleService;
|
||||
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
|
||||
import com.ruoyi.yunxin.Yunxin;
|
||||
import com.ruoyi.cai.notice.enums.CallNoticeEnum;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class RoomCheckJobService {
|
||||
|
||||
@Autowired
|
||||
private RoomService roomService;
|
||||
@Autowired
|
||||
private UserCallService userCallService;
|
||||
@Autowired
|
||||
private Yunxin yunxin;
|
||||
@Autowired
|
||||
private SettleService settleService;
|
||||
@Autowired
|
||||
private RoomCtxCache roomCtxCache;
|
||||
|
||||
public JobResp checkRoom(Long roomId){
|
||||
Room room = roomService.load(roomId);
|
||||
log.info("调试日志:开始检查房间 room={}" , JSON.toJSONString(room));
|
||||
if(room == null){
|
||||
return JobResp.builder().nextCreateJob(false).build();
|
||||
}
|
||||
// 检查是否三分钟没有接听
|
||||
log.info("调试日志:检查是否三分钟没有接听 roomId={}" , roomId);
|
||||
JobResp resp = this.checkRoomCallerTimeOut(room);
|
||||
if(!resp.isNextCreateJob()){
|
||||
log.info("调试日志:三分钟没有接听,可能关闭了 roomId={}" , roomId);
|
||||
return resp;
|
||||
}
|
||||
// 检查心跳
|
||||
log.info("调试日志:检查心跳是否超时 roomId={}" , roomId);
|
||||
JobResp heartbeat = checkRoomHeartbeat(room);
|
||||
if(!heartbeat.isNextCreateJob()){
|
||||
log.info("调试日志:心跳超时了,可能关闭了 roomId={}" , roomId);
|
||||
return heartbeat;
|
||||
}
|
||||
this.checkCanCallTime(room);
|
||||
// 尝试结算一下
|
||||
log.info("调试日志:开始尝试结算 roomId={}" , roomId);
|
||||
SettleResp settleResp = settleService.processOn(roomId);
|
||||
JobResp build = JobResp.builder().nextCreateJob(settleResp.isNextRun()).build();
|
||||
if(!build.isNextCreateJob()){
|
||||
log.info("调试日志:结算成功了,可能关闭了 roomId={}" , roomId);
|
||||
}
|
||||
return build;
|
||||
}
|
||||
|
||||
|
||||
private void checkCanCallTime(Room room){
|
||||
Long time = roomService.canCallTime(room);
|
||||
if(time < 150 && time > 60){ // 提示余额不足
|
||||
String sessionKey = roomCtxCache.getSessionKeyCallerByRoomId(room.getRoomId());
|
||||
RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.rechargeNotice("您的余额不足,点此充值"));
|
||||
}
|
||||
}
|
||||
|
||||
private JobResp checkRoomHeartbeat(Room room){
|
||||
if(!room.isCanCall()){
|
||||
return JobResp.builder().nextCreateJob(true).build();
|
||||
}
|
||||
Long roomId = room.getRoomId();
|
||||
UserData callUserData = room.getCallUserData();
|
||||
UserData receiverUserData = room.getReceiverUserData();
|
||||
boolean timeOut = false;
|
||||
WsR hangup = null;
|
||||
if(isHeartTimeout(callUserData)){
|
||||
timeOut = true;
|
||||
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
|
||||
}else if(isHeartTimeout(receiverUserData)){
|
||||
timeOut = true;
|
||||
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
|
||||
}
|
||||
if(timeOut){
|
||||
boolean nextCreateJob = false;
|
||||
if(roomService.hangUp(roomId)){
|
||||
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
|
||||
.eq(UserCall::getId,roomId)
|
||||
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
|
||||
SettleResp settleResp = settleService.processOn(roomId);
|
||||
nextCreateJob = settleResp.isNextRun();
|
||||
}
|
||||
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
|
||||
RoomWebSocketUtil.sendSendMessage(keys, hangup);
|
||||
roomService.closeAllFd(roomId);
|
||||
return JobResp.builder().nextCreateJob(nextCreateJob).build();
|
||||
}
|
||||
return JobResp.builder().nextCreateJob(true).build();
|
||||
}
|
||||
|
||||
|
||||
// 检测是不是3分钟没有接听
|
||||
private JobResp checkRoomCallerTimeOut(Room room){
|
||||
if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
|
||||
&& !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())) {
|
||||
return JobResp.builder().nextCreateJob(true).build();
|
||||
}
|
||||
UserData callUserData = room.getCallUserData();
|
||||
UserData receiverUserData = room.getReceiverUserData();
|
||||
Long connectTimeCaller = callUserData.getConnectTime();
|
||||
Long connectTimeReceiver = receiverUserData.getConnectTime();
|
||||
boolean timeOut = false;
|
||||
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
|
||||
timeOut = true;
|
||||
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
|
||||
timeOut = true;
|
||||
}
|
||||
if(timeOut){
|
||||
Long roomId = room.getRoomId();
|
||||
boolean nextCreateJob = false;
|
||||
if(roomService.hangUp(roomId)){
|
||||
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
|
||||
.eq(UserCall::getId,roomId)
|
||||
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
|
||||
SettleResp settleResp = settleService.processOn(roomId);
|
||||
nextCreateJob = settleResp.isNextRun();
|
||||
}
|
||||
roomService.closeAllFd(roomId);
|
||||
if(!nextCreateJob){
|
||||
yunxinWsService.sendToCallNotify(callUserData.getId(),receiverUserData.getId(), CallNoticeEnum.TIMEOUT,0L);
|
||||
}
|
||||
return JobResp.builder().nextCreateJob(nextCreateJob).build();
|
||||
}
|
||||
return JobResp.builder().nextCreateJob(true).build();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private YunxinWsServiceV2 yunxinWsService;
|
||||
|
||||
|
||||
/**
|
||||
* 是否心跳超时
|
||||
* @param userData
|
||||
* @return
|
||||
*/
|
||||
private boolean isHeartTimeout(UserData userData){
|
||||
if(userData.getConnectTime() == null){
|
||||
return false;
|
||||
}
|
||||
if(DateUtil.currentSeconds() - userData.getHeartTime() < 117){
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -32,9 +32,9 @@ spring:
|
||||
driverClassName: com.mysql.cj.jdbc.Driver
|
||||
# jdbc 所有参数配置参考 https://lionli.blog.csdn.net/article/details/122018562
|
||||
# rewriteBatchedStatements=true 批处理优化 大幅提升批量插入更新删除性能(对数据库有性能损耗 使用批量操作应考虑性能问题)
|
||||
url: jdbc:mysql://124.222.254.188:4306/cai-new?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true
|
||||
url: jdbc:mysql://localhost:3306/cai-new?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true
|
||||
username: root
|
||||
password: 383200134
|
||||
password: 123
|
||||
# 从库数据源
|
||||
slave:
|
||||
lazy: true
|
||||
@@ -67,9 +67,9 @@ spring:
|
||||
# 端口,默认为6379
|
||||
port: 9379
|
||||
# 数据库索引
|
||||
database: 15
|
||||
database: 12
|
||||
# 密码(如没有密码请注释掉)
|
||||
password: 383200134
|
||||
password: 123
|
||||
# 连接超时时间
|
||||
timeout: 15s
|
||||
# 是否开启ssl
|
||||
@@ -104,9 +104,9 @@ spring:
|
||||
rabbitmq:
|
||||
addresses: 127.0.0.1 #ip地址
|
||||
username: admin # 账号
|
||||
password: 383200134 # 密码
|
||||
password: 123 # 密码
|
||||
port: 5672
|
||||
virtual-host: /cai
|
||||
virtual-host: /cai-dev
|
||||
agora:
|
||||
app-id: 58ff3a37d91d48c7a8ef7a56fb8f62d0
|
||||
key: 0cca1a53262c4c74b0a8c653a9b7540e
|
||||
|
||||
@@ -60,7 +60,7 @@ spring:
|
||||
# 端口,默认为6379
|
||||
port: 9379
|
||||
# 数据库索引
|
||||
database: 15
|
||||
database: 12
|
||||
# 密码(如没有密码请注释掉)
|
||||
password: 383200134
|
||||
# 连接超时时间
|
||||
|
||||
Reference in New Issue
Block a user