This commit is contained in:
张良(004796)
2024-01-12 17:50:50 +08:00
parent 728192d4f0
commit 28623f17d1
28 changed files with 746 additions and 110 deletions

View File

@@ -1,5 +1,6 @@
package com.ruoyi.cai.ws.bean;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import lombok.Data;
import java.math.BigDecimal;
@@ -13,11 +14,19 @@ public class RoomData {
private BigDecimal videoDivide;
private Long payCoin = 0L;
private Long payIncome = 0L;
private Long hangUpTime; // 掉线时间
private Long hangUpTime; // 结束时间
private Long settleTime; // 结算时间
private Long beginTime; // 开始时间
private boolean releaseRes = false;
public boolean isCanCall(){
return RoomStatusEnums.isCanCall(status);
}
public boolean isOnline() {
return RoomStatusEnums.STATUS_AGREE.getCode().equals(status);
}
}

View File

@@ -5,6 +5,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
@Component
public class OnlineDataCache {
@Autowired
@@ -14,6 +17,10 @@ public class OnlineDataCache {
return RedisConstant.ONLINE_ROOM_DATA;
}
public Set<String> getAll(){
return redisTemplate.opsForSet().members(getKey());
}
public void add(Long roomId){
redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId));
}

View File

@@ -1,22 +1,17 @@
package com.ruoyi.cai.ws.cache;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.bean.RoomData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.poi.ss.formula.functions.Roman;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -105,4 +100,15 @@ public class RoomDataCache {
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime);
return BooleanUtils.isTrue(execute);
}
private final static String INCS_BLACK_AMOUNT =
"redis.call('hIncrBy', KEYS[1], 'payCoin', ARGV[1])\n" +
"redis.call('hIncrBy', KEYS[1], 'payIncome', ARGV[2])";
public boolean incsBlackAmount(Long roomId, Long decrCoin, Long decrIncomeCoin) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(INCS_BLACK_AMOUNT,Boolean.class);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),
decrCoin,decrIncomeCoin);
return BooleanUtils.isTrue(execute);
}
}

View File

@@ -2,10 +2,12 @@ package com.ruoyi.cai.ws.handler;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.chat.ChatManager;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.FdCtxDataCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.dto.WsToken;
import com.ruoyi.cai.ws.service.CheckConnectionDTO;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.util.MapGetUtil;
@@ -29,6 +31,8 @@ public class MessageHandleApplication {
private RoomService roomService;
@Autowired
private FdCtxDataCache fdCtxDataCache;
@Autowired
private ChatManager chatManager;
public void processOn(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
@@ -38,23 +42,27 @@ public class MessageHandleApplication {
return;
}
Map<String, Object> attributes = session.getAttributes();
Long roomId = MapGetUtil.getLong(attributes.get("roomId"));
String sessionKey = MapGetUtil.getString(attributes.get("token"));
WsToken wsToken = chatManager.getToken(String.valueOf(attributes.get("token")));
if(wsToken == null){
WsExceptionUtil.throwExceptionFast(session,"无效token");
return;
}
Long roomId = wsToken.getRoomId();
Room room = roomService.load(roomId);
if(room == null){
WsExceptionUtil.throwException("房间不可用", sessionKey,HangUpEnums.OTHER, roomId);
WsExceptionUtil.throwException(session, "房间不可用" ,HangUpEnums.OTHER, roomId);
return;
}
CheckConnectionDTO checkConnect = roomService.checkConnect(room);
if(checkConnect != null){
WsExceptionUtil.throwException(sessionKey,checkConnect.getMessage(),checkConnect.getHangUpEnums(),roomId);
WsExceptionUtil.throwException(session,checkConnect.getMessage(),checkConnect.getHangUpEnums(),roomId);
return;
}
IMessageHandler handler = map.get(String.valueOf(method));
if(handler == null){
return;
}
FdCtxData fdCtxData = fdCtxDataCache.getByRoomId(sessionKey);
FdCtxData fdCtxData = fdCtxDataCache.getByRoomId(session.getId());
handler.processOn(room,fdCtxData, jsonObject);
}

View File

@@ -13,6 +13,7 @@ import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import javax.websocket.server.PathParam;
import java.io.IOException;
import java.util.Map;
/**
@@ -32,9 +33,9 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
* 连接成功后
*/
@Override
public void afterConnectionEstablished(@NotNull WebSocketSession session) {
public void afterConnectionEstablished(WebSocketSession session) {
openLogic.processOn(session);
// log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId());
log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId());
}
/**
@@ -46,7 +47,6 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String token = String.valueOf(session.getAttributes().get(WebSocketConstants.TOKEN));
messageHandleApplication.processOn(session,message);
}

View File

@@ -3,6 +3,7 @@ package com.ruoyi.cai.ws.handler.message;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.mq.AmqpProducer;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
@@ -26,6 +27,8 @@ public class AgreeMessageHandle extends AbstractMessageHandle implements IMessag
private RoomDataCache roomDataCache;
@Autowired
private UserCallService userCallService;
@Autowired
private AmqpProducer amqpProducer;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
@@ -41,6 +44,7 @@ public class AgreeMessageHandle extends AbstractMessageHandle implements IMessag
.eq(UserCall::getId,room.getRoomId())
.set(UserCall::getStatus, RoomStatusEnums.STATUS_AGREE.getCode())
.set(UserCall::getBeginTime, LocalDateTime.now()));
amqpProducer.sendCheckTimeOut(room.getRoomId()+"",60); // 1分钟延时消息开始扣钱
String message = "提示禁止任何涉黄、任何微信QQ引导到其它平台行为";
sendToAll(room.getRoomId(), WsRMsgGen.startVideo(room.getRoomId(),0L),WsRMsgGen.sysNotice(message));
}

View File

@@ -6,6 +6,7 @@ import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import com.ruoyi.yunxin.Yunxin;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room;
@@ -38,6 +39,10 @@ public class CancelMessageHandler extends AbstractMessageHandle implements IMess
&& !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
}
// 挂断房间
if(!roomService.hangUp(room.getRoomId())){
return;
}
Long roomId = room.getRoomId();
sendToCurrent(fdCtxData,WsRMsgGen.hangup("通话已取消",roomId, HangUpEnums.CANCEL.getCode()));
sendToReceiver(roomId,WsRMsgGen.hangup("对方已取消",roomId, HangUpEnums.CANCEL.getCode()));

View File

@@ -9,6 +9,7 @@ import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.handler.AbstractMessageHandle;
import com.ruoyi.cai.ws.handler.IMessageHandler;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -20,7 +21,7 @@ public class HangupMessageHandler extends AbstractMessageHandle implements IMess
@Autowired
private RoomService roomService;
@Autowired
private RoomDataCache roomDataCache;
private SettleService settleService;
@Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
@@ -31,13 +32,12 @@ public class HangupMessageHandler extends AbstractMessageHandle implements IMess
if(!room.getRoomId().equals(fdCtxData.getRoomId())){
return;
}
boolean b = roomDataCache.hangUp(room.getRoomId());
if(!b){
// 挂断房间
if(!roomService.hangUp(room.getRoomId())){
return;
}
// 触发结算 TODO
// 触发结算
settleService.processOn(room);
Integer type = fdCtxData.isCaller() ? HangUpEnums.FROM.getCode() : HangUpEnums.TO.getCode();
sendToCurrent(fdCtxData,WsRMsgGen.hangup("您已挂断",room.getRoomId(), type));
sendToTar(fdCtxData,WsRMsgGen.hangup("对方已挂断",room.getRoomId(), type));

View File

@@ -24,8 +24,6 @@ import org.springframework.stereotype.Component;
*/
@Component("refuse")
public class RefuseMessageHandler extends AbstractMessageHandle implements IMessageHandler {
@Autowired
private RoomDataCache roomDataCache;
@Autowired
private RoomService roomService;
@Autowired
@@ -37,7 +35,7 @@ public class RefuseMessageHandler extends AbstractMessageHandle implements IMess
if(!fdCtxData.isReceiver() || !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
}
boolean b = roomDataCache.hangUp(room.getRoomId());
boolean b = roomService.hangUp(room.getRoomId());
if(!b){
return;
}

View File

@@ -0,0 +1,75 @@
package com.ruoyi.cai.ws.job;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.yunxin.Yunxin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public class CheckTimeOutJob {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private RoomService roomService;
@Autowired
private UserCallService userCallService;
@Autowired
private RoomCtxCache roomCtxCache;
@Autowired
private Yunxin yunxin;
/**
* 1 分钟执行一次
*/
@Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> roomIdStr = onlineDataCache.getAll();
for (String roomIdS : roomIdStr) {
Long roomId = Long.valueOf(roomIdS);
this.deal(roomId);
}
}
public void deal(Long roomId){
Room room = roomService.load(roomId);
if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
&& !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
}
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
// 检测是不是3分钟没有接听
Long connectTimeCaller = callUserData.getConnectTime();
Long connectTimeReceiver = receiverUserData.getConnectTime();
boolean timeOut = false;
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
timeOut = true;
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
timeOut = true;
}
if(timeOut){
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
roomService.closeAllFd(roomId);
ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0);
yunxin.sendTo(receiverUserData.getId(),callUserData.getId(),imDataRes);
}
}
}
}

View File

@@ -0,0 +1,99 @@
package com.ruoyi.cai.ws.job;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsR;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class HeartbeatJob {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private RoomService roomService;
@Autowired
private UserCallService userCallService;
@Autowired
private SettleService settleService;
@Autowired
private RoomCtxCache roomCtxCache;
/**
* 30秒执行一次
*/
@Scheduled(fixedDelay = 30,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
Room room = roomService.load(roomId);
if(!room.isCanCall()){
return;
}
UserData callUserData = room.getCallUserData();
UserData receiverUserData = room.getReceiverUserData();
boolean timeOut = false;
WsR hangup = null;
if(isHeartTimeout(callUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
}else if(isHeartTimeout(receiverUserData)){
timeOut = true;
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
}
if(timeOut){
if(roomService.hangUp(roomId)){
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
settleService.processOn(room);
}
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, hangup);
roomService.closeAllFd(roomId);
}
}catch (Exception e){
log.error("定时心跳检测失败!",e);
}
}
}
/**
* 是否心跳超时
* @param userData
* @return
*/
private boolean isHeartTimeout(UserData userData){
if(userData.getConnectTime() == null){
return false;
}
if(DateUtil.currentSeconds() - userData.getHeartTime() < 120){
return false;
}
return true;
}
}

View File

@@ -0,0 +1,44 @@
package com.ruoyi.cai.ws.job;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class SettleJob {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private SettleService settleService;
@Autowired
private RoomService roomService;
/**
* 每 1 分钟执行一次
*/
@Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
Room room = roomService.load(roomId);
if(room == null){
return;
}
settleService.processOn(room);
}catch (Exception e){
log.info("定时任务结算失败!",e);
}
}
}
}

View File

@@ -3,6 +3,7 @@ package com.ruoyi.cai.ws.processon;
import cn.hutool.core.date.DateUtil;
import com.ruoyi.cai.chat.ChatManager;
import com.ruoyi.cai.executor.ExecutorConstant;
import com.ruoyi.cai.mq.AmqpProducer;
import com.ruoyi.cai.service.AnchorService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.trd.Agora;
@@ -27,7 +28,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,7 +57,7 @@ public class OpenLogic {
@Autowired
private Yunxin yunxin;
public void processOn(WebSocketSession session) throws IOException {
public void processOn(WebSocketSession session) {
Map<String, Object> attributes = session.getAttributes();
Object token = attributes.get("token");
if(token == null){
@@ -105,9 +105,11 @@ public class OpenLogic {
// 走接收方逻辑
receiverConnection(session,room,userId);
}
}
@Autowired
private AmqpProducer amqpProducer;
public void callerConnection(WebSocketSession session,Room room,Long userId){
boolean isFirst = false;
Integer status = room.getStatus();
@@ -121,6 +123,7 @@ public class OpenLogic {
roomDataCache.setStatus(room.getRoomId(),RoomStatusEnums.STATUS_CALLER_CONNECT);
onlineDataCache.add(room.getRoomId());
userService.updateVideoStatus(userId,1);
amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182);
isFirst = true;
}
// 已经接通
@@ -166,6 +169,7 @@ public class OpenLogic {
map.put("connectTime", DateUtil.currentSeconds());
map.put("heartTime",DateUtil.currentSeconds());
userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_RECEIVER,map);
amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182);
// 房间号状态设置为 接收方已连接
boolean res = roomDataCache.setStatusReceiverConnection(room.getRoomId());
if(!res){

View File

@@ -85,6 +85,10 @@ public class RoomService {
}
}
/**
* 删除房间号 关联的ws链接 同时 踢人下线
* @param roomId
*/
public void closeAllFd(Long roomId){
List<String> sessionKeysByRoomId = roomCtxCache.getSessionKeysByRoomId(roomId);
for (String sessionKey : sessionKeysByRoomId) {
@@ -117,6 +121,17 @@ public class RoomService {
return null;
}
/**
* 挂断房间
* 将房间状态设置为 通话结束,同时设置结束时间
*
* @param roomId
* @return
*/
public boolean hangUp(Long roomId){
return roomDataCache.hangUp(roomId);
}
public Long getCallTime(Room room) {
RoomData roomData = room.getRoomData();
Long beginTime = roomData.getBeginTime();

View File

@@ -1,22 +1,39 @@
package com.ruoyi.cai.ws.service;
import cn.hutool.core.date.DateUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.ruoyi.cai.domain.Account;
import com.ruoyi.cai.dto.video.VideoSettleResp;
import com.ruoyi.cai.dto.video.WithholdingFeeUserResp;
import com.ruoyi.cai.manager.LockManager;
import com.ruoyi.cai.service.AccountService;
import com.ruoyi.cai.trd.ImDataRes;
import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.cache.CallerRoomCache;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.dto.WsR;
import com.ruoyi.cai.ws.dto.WsRMsgGen;
import com.ruoyi.cai.ws.util.RoomWebSocketUtil;
import com.ruoyi.yunxin.Yunxin;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.RoomData;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.Roman;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
@@ -32,94 +49,124 @@ public class SettleService {
private OnlineDataCache onlineDataCache;
@Autowired
private Yunxin yunxin;
@Autowired
private RoomCtxCache roomCtxCache;
@Autowired
private RedissonClient redissonClient;
public void processOn(Room room){
/**
* 预扣费
* @param roomId
*/
public boolean withholdingFee(Long roomId){
Room room = roomService.load(roomId);
if(room == null || !room.isOnline()){
log.warn("房间已经不存在或者不在线 无需预扣费 roomId={}",roomId);
return false;
}
SettleService settleService = SpringUtil.getBean(SettleService.class);
try {
deal(room);
}finally {
Long userId = room.getCallUserData().getId();
Long price = room.getRoomData().getCallPrice();
settleService.withholdingFeeUser(userId,price,room);
return true;
}catch (Exception e){
log.error("预扣费失败!准备挂断电话",e);
boolean b = roomService.hangUp(room.getRoomId());
if(b){
// 结算操作
settleService.processOn(room);
}
// 向客户端发送挂断指令
WsR r = WsRMsgGen.hangup("拨打方余额不足", room.getRoomId(), HangUpEnums.NOTMONEY.getCode());
List<String> sessionKeys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(sessionKeys, r);
roomService.closeAllFd(roomId);
return false;
}
}
/**
* 预扣费业务操作
*/
@Transactional(rollbackFor = Exception.class)
public void withholdingFeeUser(Long userId,Long price,Room room){
WithholdingFeeUserResp resp = accountService.withholdingFeeUser(userId, price);
roomDataCache.incsBlackAmount(room.getRoomId(),resp.getDecrCoin(),resp.getDecrIncomeCoin());
}
/**
* 结算处理
* @param room
*/
@SneakyThrows
public void processOn(Room room){
String lock = LockManager.getVideoSettleLock(room.getRoomId());
RLock clientLock = redissonClient.getLock(lock);
boolean locked = clientLock.isLocked();
if(locked){
log.info("正在结算中,稍等!");
return;
}
boolean lockFlag = clientLock.tryLock(5, TimeUnit.SECONDS);
if(!lockFlag){
log.info("正在结算中,稍等!");
return;
}
try {
deal(room);
}finally {
clientLock.unlock();
}
}
/**
* 结算内部整合逻辑
* @param room
*/
private void deal(Room room){
if(room.isCanCall() || room.isSettle()){
return;
}
if(!room.isReleaseRes()){
roomService.releaseRes(Long.valueOf(room.getRoomId()));
if(!room.isReleaseRes()){ // 房间资源是否已经释放
roomService.releaseRes(room.getRoomId());
return;
}
// 结算
if(!settle(room)){
return;
}
// 移除在线房间状态
onlineDataCache.remove(room.getRoomId());
// 更新支付金额信息
Map<String,Object> map = new HashMap<>();
map.put("payCoin", ""); // 实际支付的金额
map.put("payIncome", "");
map.put("settleTime", DateUtil.currentSeconds());
roomDataCache.hMSet(Long.valueOf(room.getRoomId()),map);
// 主叫方通话市场通知
Long callTime = roomService.getCallTime(room);
Long receiverUserId = room.getReceiverUserData().getId();
Long callUserId = room.getCallUserData().getId();
if(callTime > 0){
ImDataRes imDataRes = ImMsgGen.callNotice(4, receiverUserId, callUserId, callTime);
yunxin.sendTo(room.getCallUserData().getId(),room.getReceiverUserData().getId(),imDataRes);
}
// 收入通知
if(room != null){ // TODO修改数据
}
// 邀请人收入通知
// 排行榜通知
}
/**
* 结算
*/
private boolean settle(Room room){
Long roomId = room.getRoomId();
// 未通话,无需结算
if(room.getRoomData().getBeginTime() == null){
roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds());
return false;
return;
}
// 开始结算
// 呼叫方扣费
this.computeCallerPay(room);
// 结束结算
return true;
}
private void computeCallerPay(Room room) {
RoomData roomData = room.getRoomData();
UserData callUserData = room.getCallUserData();
Long callPrice = roomData.getCallPrice();
VideoSettleResp resp = accountService.videoSettle(room);
// 修改房间缓存
Map<String,Object> map = new HashMap<>();
map.put("payCoin",resp.getPayCoin());
map.put("payIncome",resp.getPayIncome());
map.put("settleTime", DateUtil.currentSeconds());
roomDataCache.hMSet(roomId,map);
// 移除在线房间状态
onlineDataCache.remove(room.getRoomId());
// 主叫方时长通知
Long callTime = roomService.getCallTime(room);
// 本次支付金额
Long totalAmount = callPrice * (callTime / 60);
// 补差价
Long diff = totalAmount - roomData.getPayCoin() + roomData.getPayIncome();
Account callAccount = accountService.getByUserId(callUserData.getId());
Long userAmount = callAccount.getCoin() + callAccount.getIncomeCoin();
if(diff > 0){
// 账户上面有余额
if(userAmount > 0){
diff = (userAmount > diff) ? diff : userAmount;
log.info("roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff);
// TODO 对账户进行统一扣费
}else{
log.info("房间需要补差价,但用户余额不足 roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff);
}
}else{
// 退钱逻辑
diff = Math.abs(diff);
// TODO 对账户进行统一退钱逻辑
Long callId = room.getCallUserData().getId();
Long receiveId = room.getReceiverUserData().getId();
if(callTime > 0){
ImDataRes imDataRes = ImMsgGen.callNotice(4, callId, receiveId, callTime);
yunxin.sendTo(callId,receiveId,imDataRes);
}
// 接收方通知
Long anchorIncome = resp.getAnchorIncome();
if(anchorIncome > 0){
ImDataRes imDataRes = ImMsgGen.callNotice(4, receiveId, callId, callTime);
yunxin.sendTo(receiveId,callId,imDataRes);
// 收入通知
// ImMsgGen.videoIncome();
// yunxin.sendTo(receiveId,null,)
}
}
}