123
This commit is contained in:
@@ -6,7 +6,7 @@ import lombok.Data;
|
|||||||
@Data
|
@Data
|
||||||
public class VideoSettleResp {
|
public class VideoSettleResp {
|
||||||
private ConsumeLog consumeLog;
|
private ConsumeLog consumeLog;
|
||||||
private Long payCoin;
|
private Long payCoin = 0L;
|
||||||
private Long payIncome;
|
private Long payIncome = 0L;
|
||||||
private Long anchorIncome;
|
private Long anchorIncome = 0L;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,9 @@ import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
|
|||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 送礼物
|
||||||
|
*/
|
||||||
@Component("gift")
|
@Component("gift")
|
||||||
public class GiftMessageHandler extends AbstractMessageHandle implements IMessageHandler {
|
public class GiftMessageHandler extends AbstractMessageHandle implements IMessageHandler {
|
||||||
@Autowired
|
@Autowired
|
||||||
|
|||||||
11
ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/JobResp.java
Normal file
11
ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/JobResp.java
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package com.ruoyi.cai.ws.job;
|
||||||
|
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
public class JobResp {
|
||||||
|
// 是否继续创建下一个JOB
|
||||||
|
private boolean nextCreateJob;
|
||||||
|
}
|
||||||
158
ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJob.java
Normal file
158
ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/RoomCheckJob.java
Normal file
@@ -0,0 +1,158 @@
|
|||||||
|
package com.ruoyi.cai.ws.job;
|
||||||
|
|
||||||
|
import cn.hutool.core.date.DateUtil;
|
||||||
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||||
|
import com.ruoyi.cai.domain.UserCall;
|
||||||
|
import com.ruoyi.cai.service.UserCallService;
|
||||||
|
import com.ruoyi.cai.trd.ImDataRes;
|
||||||
|
import com.ruoyi.cai.trd.ImMsgGen;
|
||||||
|
import com.ruoyi.cai.ws.bean.Room;
|
||||||
|
import com.ruoyi.cai.ws.bean.UserData;
|
||||||
|
import com.ruoyi.cai.ws.cache.RoomCtxCache;
|
||||||
|
import com.ruoyi.cai.ws.constant.HangUpEnums;
|
||||||
|
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
|
||||||
|
import com.ruoyi.cai.ws.dto.WsR;
|
||||||
|
import com.ruoyi.cai.ws.dto.WsRMsgGen;
|
||||||
|
import com.ruoyi.cai.ws.service.RoomService;
|
||||||
|
import com.ruoyi.cai.ws.service.SettleResp;
|
||||||
|
import com.ruoyi.cai.ws.service.SettleService;
|
||||||
|
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
|
||||||
|
import com.ruoyi.yunxin.Yunxin;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class RoomCheckJob {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private RoomService roomService;
|
||||||
|
@Autowired
|
||||||
|
private UserCallService userCallService;
|
||||||
|
@Autowired
|
||||||
|
private Yunxin yunxin;
|
||||||
|
@Autowired
|
||||||
|
private SettleService settleService;
|
||||||
|
@Autowired
|
||||||
|
private RoomCtxCache roomCtxCache;
|
||||||
|
|
||||||
|
public JobResp checkRoom(Long roomId){
|
||||||
|
Room room = roomService.load(roomId);
|
||||||
|
if(room == null){
|
||||||
|
return JobResp.builder().nextCreateJob(false).build();
|
||||||
|
}
|
||||||
|
// 检查是否三分钟没有接听
|
||||||
|
JobResp resp = this.checkRoomCallerTimeOut(room);
|
||||||
|
if(!resp.isNextCreateJob()){
|
||||||
|
return resp;
|
||||||
|
}
|
||||||
|
// 检查心跳
|
||||||
|
JobResp heartbeat = checkRoomHeartbeat(room);
|
||||||
|
if(!heartbeat.isNextCreateJob()){
|
||||||
|
return heartbeat;
|
||||||
|
}
|
||||||
|
this.checkCanCallTime(room);
|
||||||
|
// 尝试结算一下
|
||||||
|
SettleResp settleResp = settleService.processOn(roomId);
|
||||||
|
return JobResp.builder().nextCreateJob(settleResp.isNextRun()).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void checkCanCallTime(Room room){
|
||||||
|
Long time = roomService.canCallTime(room);
|
||||||
|
if(time < 150 && time > 60){ // 提示余额不足
|
||||||
|
String sessionKey = roomCtxCache.getSessionKeyCallerByRoomId(room.getRoomId());
|
||||||
|
RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.rechargeNotice("您的余额不足,点此充值"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private JobResp checkRoomHeartbeat(Room room){
|
||||||
|
if(!room.isCanCall()){
|
||||||
|
return JobResp.builder().nextCreateJob(true).build();
|
||||||
|
}
|
||||||
|
Long roomId = room.getRoomId();
|
||||||
|
UserData callUserData = room.getCallUserData();
|
||||||
|
UserData receiverUserData = room.getReceiverUserData();
|
||||||
|
boolean timeOut = false;
|
||||||
|
WsR hangup = null;
|
||||||
|
if(isHeartTimeout(callUserData)){
|
||||||
|
timeOut = true;
|
||||||
|
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
|
||||||
|
}else if(isHeartTimeout(receiverUserData)){
|
||||||
|
timeOut = true;
|
||||||
|
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
|
||||||
|
}
|
||||||
|
if(timeOut){
|
||||||
|
boolean nextCreateJob = false;
|
||||||
|
if(roomService.hangUp(roomId)){
|
||||||
|
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
|
||||||
|
.eq(UserCall::getId,roomId)
|
||||||
|
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
|
||||||
|
SettleResp settleResp = settleService.processOn(roomId);
|
||||||
|
nextCreateJob = settleResp.isNextRun();
|
||||||
|
}
|
||||||
|
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
|
||||||
|
RoomWebSocketUtil.sendSendMessage(keys, hangup);
|
||||||
|
roomService.closeAllFd(roomId);
|
||||||
|
return JobResp.builder().nextCreateJob(nextCreateJob).build();
|
||||||
|
}
|
||||||
|
return JobResp.builder().nextCreateJob(true).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// 检测是不是3分钟没有接听
|
||||||
|
private JobResp checkRoomCallerTimeOut(Room room){
|
||||||
|
if(RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
|
||||||
|
&& RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())) {
|
||||||
|
return JobResp.builder().nextCreateJob(true).build();
|
||||||
|
}
|
||||||
|
UserData callUserData = room.getCallUserData();
|
||||||
|
UserData receiverUserData = room.getReceiverUserData();
|
||||||
|
Long connectTimeCaller = callUserData.getConnectTime();
|
||||||
|
Long connectTimeReceiver = receiverUserData.getConnectTime();
|
||||||
|
boolean timeOut = false;
|
||||||
|
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
|
||||||
|
timeOut = true;
|
||||||
|
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
|
||||||
|
timeOut = true;
|
||||||
|
}
|
||||||
|
if(timeOut){
|
||||||
|
Long roomId = room.getRoomId();
|
||||||
|
boolean nextCreateJob = false;
|
||||||
|
if(roomService.hangUp(roomId)){
|
||||||
|
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
|
||||||
|
.eq(UserCall::getId,roomId)
|
||||||
|
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
|
||||||
|
SettleResp settleResp = settleService.processOn(roomId);
|
||||||
|
nextCreateJob = settleResp.isNextRun();
|
||||||
|
}
|
||||||
|
roomService.closeAllFd(roomId);
|
||||||
|
ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0);
|
||||||
|
yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes);
|
||||||
|
return JobResp.builder().nextCreateJob(nextCreateJob).build();
|
||||||
|
}
|
||||||
|
return JobResp.builder().nextCreateJob(true).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否心跳超时
|
||||||
|
* @param userData
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private boolean isHeartTimeout(UserData userData){
|
||||||
|
if(userData.getConnectTime() == null){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if(DateUtil.currentSeconds() - userData.getHeartTime() < 117){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package com.ruoyi.cai.ws.service;
|
||||||
|
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Builder
|
||||||
|
public class SettleResp {
|
||||||
|
/**
|
||||||
|
* 是否需要 过一会在执行
|
||||||
|
*/
|
||||||
|
private boolean nextRun;
|
||||||
|
}
|
||||||
@@ -101,27 +101,26 @@ public class SettleService {
|
|||||||
/**
|
/**
|
||||||
* 结算处理
|
* 结算处理
|
||||||
*/
|
*/
|
||||||
@SneakyThrows
|
public SettleResp processOn(Long roomId){
|
||||||
public void processOn(Long roomId){
|
|
||||||
Room room = roomService.load(roomId);
|
Room room = roomService.load(roomId);
|
||||||
if(room == null){
|
if(room == null){
|
||||||
log.warn("房间不存在,无需结算 roomId={}",roomId);
|
log.warn("房间不存在,无需结算 roomId={}",roomId);
|
||||||
return;
|
return SettleResp.builder().nextRun(false).build();
|
||||||
}
|
}
|
||||||
String lock = LockManager.getVideoSettleLock(room.getRoomId());
|
String lock = LockManager.getVideoSettleLock(room.getRoomId());
|
||||||
RLock clientLock = redissonClient.getLock(lock);
|
RLock clientLock = redissonClient.getLock(lock);
|
||||||
boolean locked = clientLock.isLocked();
|
boolean locked = clientLock.isLocked();
|
||||||
if(locked){
|
if(locked){
|
||||||
log.info("正在结算中,稍等!");
|
log.info("正在结算中,稍等!");
|
||||||
return;
|
return SettleResp.builder().nextRun(true).build();
|
||||||
}
|
}
|
||||||
boolean lockFlag = clientLock.tryLock();
|
boolean lockFlag = clientLock.tryLock();
|
||||||
if(!lockFlag){
|
if(!lockFlag){
|
||||||
log.info("正在结算中,稍等!");
|
log.info("正在结算中,稍等!");
|
||||||
return;
|
return SettleResp.builder().nextRun(true).build();
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
deal(room);
|
return deal(room);
|
||||||
}finally {
|
}finally {
|
||||||
clientLock.unlock();
|
clientLock.unlock();
|
||||||
}
|
}
|
||||||
@@ -135,20 +134,21 @@ public class SettleService {
|
|||||||
* 结算内部整合逻辑
|
* 结算内部整合逻辑
|
||||||
* @param room
|
* @param room
|
||||||
*/
|
*/
|
||||||
private void deal(Room room){
|
private SettleResp deal(Room room){
|
||||||
if(room.isCanCall() || room.isSettle()){
|
if(room.isCanCall() || room.isSettle()){ // 正在通话中,或者已经结算了 ,都无需在结算
|
||||||
return;
|
return SettleResp.builder().nextRun(true).build();
|
||||||
}
|
}
|
||||||
if(!room.isReleaseRes()){ // 房间资源是否已经释放
|
if(!room.isReleaseRes()){ // 房间资源是否已经释放
|
||||||
roomService.releaseRes(room.getRoomId());
|
roomService.releaseRes(room.getRoomId()); // 释放房间会更新用户和主播的状态
|
||||||
}
|
}
|
||||||
Long roomId = room.getRoomId();
|
Long roomId = room.getRoomId();
|
||||||
// 未通话,无需结算
|
// 未通话,无需结算
|
||||||
|
VideoSettleResp resp = new VideoSettleResp();
|
||||||
if(room.getRoomData().getBeginTime() == null){
|
if(room.getRoomData().getBeginTime() == null){
|
||||||
roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds());
|
roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds());
|
||||||
return;
|
}else{
|
||||||
|
resp = consumerManager.videoSettle(room);
|
||||||
}
|
}
|
||||||
VideoSettleResp resp = consumerManager.videoSettle(room);
|
|
||||||
// 修改房间缓存
|
// 修改房间缓存
|
||||||
Map<String,Object> map = new HashMap<>();
|
Map<String,Object> map = new HashMap<>();
|
||||||
map.put("payCoin",resp.getPayCoin());
|
map.put("payCoin",resp.getPayCoin());
|
||||||
@@ -174,6 +174,7 @@ public class SettleService {
|
|||||||
// ImMsgGen.videoIncome();
|
// ImMsgGen.videoIncome();
|
||||||
// yunxin.sendTo(receiveId,null,)
|
// yunxin.sendTo(receiveId,null,)
|
||||||
}
|
}
|
||||||
|
return SettleResp.builder().nextRun(false).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user