This commit is contained in:
77
2024-05-21 11:19:46 +08:00
parent 5dc1ca2521
commit 640aa3237e
13 changed files with 101 additions and 20 deletions

View File

@@ -1,6 +1,7 @@
package com.ruoyi.web.controller.cai.app; package com.ruoyi.web.controller.cai.app;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ruoyi.cai.constant.CommonConstant;
import com.ruoyi.cai.dto.app.query.StarOrVisitorReq; import com.ruoyi.cai.dto.app.query.StarOrVisitorReq;
import com.ruoyi.cai.dto.app.query.VisitorQuery; import com.ruoyi.cai.dto.app.query.VisitorQuery;
import com.ruoyi.cai.dto.app.vo.user.UserStarOrVisitorList; import com.ruoyi.cai.dto.app.vo.user.UserStarOrVisitorList;
@@ -45,7 +46,14 @@ public class UserVisitorAppController {
@Log(title = "我的浏览记录、访客查询-分页", businessType = BusinessType.OTHER,isPrintResponseData = false, isSaveDb = false) @Log(title = "我的浏览记录、访客查询-分页", businessType = BusinessType.OTHER,isPrintResponseData = false, isSaveDb = false)
public R<List<UserStarOrVisitorList>> page(VisitorQuery query, PageQuery pageQuery){ public R<List<UserStarOrVisitorList>> page(VisitorQuery query, PageQuery pageQuery){
Page<UserStarOrVisitorList> res = userVisitorService.pageApp(pageQuery,query); Page<UserStarOrVisitorList> res = userVisitorService.pageApp(pageQuery,query);
return R.ok(res.getRecords()); List<UserStarOrVisitorList> records = res.getRecords();
for (UserStarOrVisitorList record : records) {
if(record.getCity() == null){
record.setCity(CommonConstant.CITY);
record.setCityId(CommonConstant.CITY_ID);
}
}
return R.ok(records);
} }
} }

View File

@@ -0,0 +1,7 @@
package com.ruoyi.cai.constant;
public class CommonConstant {
public static final String CITY = "北京";
public static final Integer CITY_ID = 110100;
}

View File

@@ -21,7 +21,6 @@ public class AmqpWsProducer {
}); });
} }
@Deprecated
public void sendRoomSettleDelay(String message, Integer timeout){ public void sendRoomSettleDelay(String message, Integer timeout){
rabbitTemplate.convertAndSend(RoomSettleDelayWsMqConstant.EXCHANGE_NAME, rabbitTemplate.convertAndSend(RoomSettleDelayWsMqConstant.EXCHANGE_NAME,
RoomSettleDelayWsMqConstant.ROUTING_KEY, RoomSettleDelayWsMqConstant.ROUTING_KEY,

View File

@@ -149,7 +149,7 @@ public class DynamicServiceImpl extends ServiceImpl<DynamicMapper, Dynamic> impl
List<DynamicAddReq.DynamicImageAddReq> imageList = res.getImageList(); List<DynamicAddReq.DynamicImageAddReq> imageList = res.getImageList();
if(CollectionUtil.isNotEmpty(imageList)){ if(CollectionUtil.isNotEmpty(imageList)){
for (DynamicAddReq.DynamicImageAddReq imageVo : imageList) { for (DynamicAddReq.DynamicImageAddReq imageVo : imageList) {
CaiFileUtils.FileSize fileSize = CaiFileUtils.getFileSize(imageVo.getUrl()); CaiFileUtils.FileSize fileSize = CaiFileUtils.getFastFileSize(imageVo.getUrl());
DynamicImages po = new DynamicImages(); DynamicImages po = new DynamicImages();
po.setUserId(dynamic.getUserId()); po.setUserId(dynamic.getUserId());
po.setDynamicId(dynamic.getId()); po.setDynamicId(dynamic.getId());

View File

@@ -20,6 +20,10 @@ public class CaiFileUtils {
private static String BASE_URL = "https://nono-1257812345.cos.ap-shanghai.myqcloud.com/"; private static String BASE_URL = "https://nono-1257812345.cos.ap-shanghai.myqcloud.com/";
public static FileSize getFastFileSize(String url){
return new FileSize();
}
public static FileSize getFileSize(String url){ public static FileSize getFileSize(String url){
FileSize fileSize = new FileSize(); FileSize fileSize = new FileSize();
String suffix = StringUtils.substring(url, url.lastIndexOf("."), url.length()); String suffix = StringUtils.substring(url, url.lastIndexOf("."), url.length());
@@ -36,6 +40,5 @@ public class CaiFileUtils {
log.error("获取文件图片大小失败",e); log.error("获取文件图片大小失败",e);
} }
return new FileSize(); return new FileSize();
} }
} }

View File

@@ -4,9 +4,12 @@ import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall; import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.notice.YunxinWsServiceV2; import com.ruoyi.cai.notice.YunxinWsServiceV2;
import com.ruoyi.cai.service.AnchorService;
import com.ruoyi.cai.service.UserCallService; import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.constant.HangUpEnums; import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums; import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsRMsgGen; import com.ruoyi.cai.ws.dto.WsRMsgGen;
@@ -28,6 +31,10 @@ public class CancelMessageHandler extends AbstractMessageHandle implements IMess
private RoomService roomService; private RoomService roomService;
@Autowired @Autowired
private YunxinWsServiceV2 yunxinWsService; private YunxinWsServiceV2 yunxinWsService;
@Autowired
private UserService userService;
@Autowired
private AnchorService anchorService;
@Override @Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) { public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
@@ -54,6 +61,12 @@ public class CancelMessageHandler extends AbstractMessageHandle implements IMess
userCallService.update(Wrappers.lambdaUpdate(UserCall.class) userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId) .eq(UserCall::getId,roomId)
.set(UserCall::getStatus,RoomStatusEnums.STATUS_CALLER_CANCEL.getCode())); .set(UserCall::getStatus,RoomStatusEnums.STATUS_CALLER_CANCEL.getCode()));
// 释放主播的接听状态
UserData receiverUserData = room.getReceiverUserData();
if(receiverUserData != null && receiverUserData.getConnectTime() != null && receiverUserData.getConnectTime() > 0){
userService.updateVideoStatus(receiverId,0);
anchorService.updateVideoStatus(receiverId,0);
}
} }
} }

View File

@@ -4,9 +4,12 @@ import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall; import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.notice.YunxinWsServiceV2; import com.ruoyi.cai.notice.YunxinWsServiceV2;
import com.ruoyi.cai.service.AnchorService;
import com.ruoyi.cai.service.UserCallService; import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.constant.HangUpEnums; import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums; import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.dto.WsRMsgGen; import com.ruoyi.cai.ws.dto.WsRMsgGen;
@@ -28,6 +31,10 @@ public class RefuseMessageHandler extends AbstractMessageHandle implements IMess
private UserCallService userCallService; private UserCallService userCallService;
@Autowired @Autowired
private YunxinWsServiceV2 yunxinWsService; private YunxinWsServiceV2 yunxinWsService;
@Autowired
private UserService userService;
@Autowired
private AnchorService anchorService;
@Override @Override
public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) { public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) {
@@ -46,11 +53,16 @@ public class RefuseMessageHandler extends AbstractMessageHandle implements IMess
Long receiverId = room.getReceiverId(); Long receiverId = room.getReceiverId();
Long callerId = room.getCallId(); Long callerId = room.getCallId();
yunxinWsService.sendToCallNotify(callerId,receiverId, CallNoticeEnum.REFUSE,0L); yunxinWsService.sendToCallNotify(callerId,receiverId, CallNoticeEnum.REFUSE,0L);
// 更新房间状态 // 更新房间状态
userCallService.update(Wrappers.lambdaUpdate(UserCall.class) userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,room.getRoomId()) .eq(UserCall::getId,room.getRoomId())
.set(UserCall::getStatus,RoomStatusEnums.STATUS_REFUSE.getCode())); .set(UserCall::getStatus,RoomStatusEnums.STATUS_REFUSE.getCode()));
// 释放主播的接听状态
UserData receiverUserData = room.getReceiverUserData();
if(receiverUserData != null && receiverUserData.getConnectTime() != null && receiverUserData.getConnectTime() > 0){
userService.updateVideoStatus(receiverId,0);
anchorService.updateVideoStatus(receiverId,0);
}
} }

View File

@@ -28,7 +28,9 @@ public class WebSocketSessionHolder {
} }
public static void addSession(String sessionKey, WebSocketSession session,Long userId) { public static void addSession(String sessionKey, WebSocketSession session,Long userId) {
log.info("收到sessionId={} userId={}",sessionKey, userId); Object roomIdObj = session.getAttributes().get("room_id");
String roomId = String.valueOf(roomIdObj);
log.info("收到sessionId={} roomId={} userId={}",sessionKey, roomId, userId);
USER_SESSION_MAP.put(sessionKey, session); USER_SESSION_MAP.put(sessionKey, session);
if(USER_SESSION_ID_MAP.containsKey(userId)){ // T人动作 if(USER_SESSION_ID_MAP.containsKey(userId)){ // T人动作
String sessionKeyOld = USER_SESSION_ID_MAP.get(userId); String sessionKeyOld = USER_SESSION_ID_MAP.get(userId);
@@ -37,10 +39,11 @@ public class WebSocketSessionHolder {
if(webSocketSession != null){ if(webSocketSession != null){
if(webSocketSession.isOpen()){ if(webSocketSession.isOpen()){
try { try {
log.info("检测到该用户存在历史的sessionId={} userId={} 清理掉",sessionKey, userId); log.info("检测到该用户存在历史的sessionId={} roomId={} userId={} 清理掉",sessionKey, roomId, userId);
webSocketSession.close(new CloseStatus(500100)); webSocketSession.close(new CloseStatus(500100));
} catch (IOException e) { } catch (IOException e) {
// ignore // ignore
log.error("关闭房间失败",e);
} }
} }
removeSession(sessionKeyOld); removeSession(sessionKeyOld);

View File

@@ -191,12 +191,6 @@ public class RoomService {
userService.updateVideoStatus(receiverId,0); userService.updateVideoStatus(receiverId,0);
anchorService.updateVideoStatus(receiverId,0); anchorService.updateVideoStatus(receiverId,0);
} }
// 声网踢人
if(room.getRoomData().getBeginTime() != null && room.getRoomData().getBeginTime() > 0){
ExecutorConstant.ROOM_EXECUTOR.execute(() -> {
agora.closeChannel(roomId);
});
}
// 修改释放状态 // 修改释放状态
Map<String,Object> map = new HashMap<>(); Map<String,Object> map = new HashMap<>();
map.put("releaseRes",true); map.put("releaseRes",true);

View File

@@ -4,10 +4,12 @@ import cn.hutool.core.date.DateUtil;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import com.ruoyi.cai.dto.video.VideoSettleResp; import com.ruoyi.cai.dto.video.VideoSettleResp;
import com.ruoyi.cai.dto.video.WithholdingFeeUserResp; import com.ruoyi.cai.dto.video.WithholdingFeeUserResp;
import com.ruoyi.cai.executor.ExecutorConstant;
import com.ruoyi.cai.manager.ConsumerManager; import com.ruoyi.cai.manager.ConsumerManager;
import com.ruoyi.cai.manager.LockManager; import com.ruoyi.cai.manager.LockManager;
import com.ruoyi.cai.notice.YunxinWsServiceV2; import com.ruoyi.cai.notice.YunxinWsServiceV2;
import com.ruoyi.cai.service.AccountService; import com.ruoyi.cai.service.AccountService;
import com.ruoyi.cai.trd.Agora;
import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.OnlineDataCache; import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomCtxCache; import com.ruoyi.cai.ws.cache.RoomCtxCache;
@@ -45,6 +47,8 @@ public class SettleService {
private RoomCtxCache roomCtxCache; private RoomCtxCache roomCtxCache;
@Autowired @Autowired
private RedissonClient redissonClient; private RedissonClient redissonClient;
@Autowired
private Agora agora;
/** /**
@@ -62,17 +66,18 @@ public class SettleService {
Long userId = room.getCallUserData().getId(); Long userId = room.getCallUserData().getId();
Integer price = room.getRoomData().getCallPrice(); Integer price = room.getRoomData().getCallPrice();
settleService.withholdingFeeUser(userId,Long.valueOf(price),room); settleService.withholdingFeeUser(userId,Long.valueOf(price),room);
log.info("roomId={} 拨打方 1分钟扣分成功",roomId);
// 给双方推送可通话时长 // 给双方推送可通话时长
try { try {
Long time = roomService.canCallTime(room); Long time = roomService.canCallTime(room);
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId); List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, WsRMsgGen.canCallTime(time)); RoomWebSocketUtil.sendSendMessage(keys, WsRMsgGen.canCallTime(time));
}catch (Exception e){ }catch (Exception e){
log.error("扣费后-推送可通话时长失败!",e); log.error("roomId={} 扣费后-推送可通话时长失败!",roomId, e);
} }
return true; return true;
}catch (Exception e){ }catch (Exception e){
log.error("预扣费失败!准备挂断电话",e); log.error("roomId={} 预扣费失败!准备挂断电话",roomId,e);
boolean b = roomService.hangUp(room.getRoomId()); boolean b = roomService.hangUp(room.getRoomId());
if(b){ if(b){
// 结算操作 // 结算操作
@@ -140,23 +145,30 @@ public class SettleService {
* @param room * @param room
*/ */
private SettleResp deal(Room room){ private SettleResp deal(Room room){
if(room.isCanCall()){ // 正在通话中,无需在结算 if(room.isCanCall()){
log.info("roomId={} 房间正在通话中,无需结算",room.getRoomId());
return SettleResp.builder().nextRun(true).build(); return SettleResp.builder().nextRun(true).build();
} }
if(room.isSettle()){ // 已经结算了,无需结算 if(room.isSettle()){
log.info("roomId={} 已经结算了,无需结算",room.getRoomId());
return SettleResp.builder().nextRun(false).build(); return SettleResp.builder().nextRun(false).build();
} }
if(!room.isReleaseRes()){ // 房间资源是否已经释放 if(!room.isReleaseRes()){ // 房间资源是否已经释放
roomService.releaseRes(room.getRoomId()); // 释放房间会更新用户和主播的状态 roomService.releaseRes(room.getRoomId()); // 释放房间会更新用户和主播的状态
} }
Long roomId = room.getRoomId(); Long roomId = room.getRoomId();
// 未通话,无需结算
VideoSettleResp resp = new VideoSettleResp(); VideoSettleResp resp = new VideoSettleResp();
if(room.getRoomData().getBeginTime() == null){ if(room.getRoomData().getBeginTime() == null){
roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds()); roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds());
}else{ }else{
resp = consumerManager.videoSettle(room); resp = consumerManager.videoSettle(room);
} }
// 声网踢人
if(room.getRoomData().getBeginTime() != null && room.getRoomData().getBeginTime() > 0){
ExecutorConstant.ROOM_EXECUTOR.execute(() -> {
agora.closeChannel(roomId);
});
}
// 修改房间缓存 // 修改房间缓存
Map<String,Object> map = new HashMap<>(); Map<String,Object> map = new HashMap<>();
map.put("payCoin",resp.getPayCoin()); map.put("payCoin",resp.getPayCoin());

View File

@@ -26,9 +26,7 @@ public class RedisConsumer {
executorService.execute(() -> { executorService.execute(() -> {
while (true){ while (true){
try { try {
System.out.println();
run(); run();
System.out.println();
}catch (Exception e){ }catch (Exception e){
} }

View File

@@ -23,7 +23,10 @@ public class RoomSettleDelayMqConsumer {
boolean next = settleService.withholdingFee(Long.valueOf(message)); boolean next = settleService.withholdingFee(Long.valueOf(message));
if(next){ if(next){
// 1分钟后继续执行 // 1分钟后继续执行
log.info("预扣分完成 1分钟后继续后执行 {}", message);
amqpWsProducer.sendRoomSettleDelay(message,60); amqpWsProducer.sendRoomSettleDelay(message,60);
}else{
log.info("预扣费失败 可能余额不足 不在继续执行 {}", message);
} }
}catch (Exception e){ }catch (Exception e){
log.error("每分钟定时扣费失败!",e); log.error("每分钟定时扣费失败!",e);

View File

@@ -5,7 +5,9 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.ruoyi.cai.domain.UserCall; import com.ruoyi.cai.domain.UserCall;
import com.ruoyi.cai.notice.YunxinWsServiceV2; import com.ruoyi.cai.notice.YunxinWsServiceV2;
import com.ruoyi.cai.service.AnchorService;
import com.ruoyi.cai.service.UserCallService; import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData; import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.RoomCtxCache; import com.ruoyi.cai.ws.cache.RoomCtxCache;
@@ -90,12 +92,15 @@ public class RoomCheckJobService {
UserData receiverUserData = room.getReceiverUserData(); UserData receiverUserData = room.getReceiverUserData();
boolean timeOut = false; boolean timeOut = false;
WsR hangup = null; WsR hangup = null;
String logMessage = null;
if(isHeartTimeout(callUserData)){ if(isHeartTimeout(callUserData)){
timeOut = true; timeOut = true;
hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode()); hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode());
logMessage = "呼叫方连接中断";
}else if(isHeartTimeout(receiverUserData)){ }else if(isHeartTimeout(receiverUserData)){
timeOut = true; timeOut = true;
hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode()); hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode());
logMessage = "接听方连接中断";
} }
if(timeOut){ if(timeOut){
boolean nextCreateJob = false; boolean nextCreateJob = false;
@@ -109,6 +114,8 @@ public class RoomCheckJobService {
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId); List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, hangup); RoomWebSocketUtil.sendSendMessage(keys, hangup);
roomService.closeAllFd(roomId); roomService.closeAllFd(roomId);
updateAnchorVideoStatus(room);
log.info("roomId={} 心跳检测发现{} 关闭房间",roomId,logMessage);
return JobResp.builder().nextCreateJob(nextCreateJob).build(); return JobResp.builder().nextCreateJob(nextCreateJob).build();
} }
return JobResp.builder().nextCreateJob(true).build(); return JobResp.builder().nextCreateJob(true).build();
@@ -148,6 +155,8 @@ public class RoomCheckJobService {
if(!nextCreateJob){ if(!nextCreateJob){
yunxinWsService.sendToCallNotify(callUserData.getId(),receiverUserData.getId(), CallNoticeEnum.TIMEOUT,0L); yunxinWsService.sendToCallNotify(callUserData.getId(),receiverUserData.getId(), CallNoticeEnum.TIMEOUT,0L);
} }
updateAnchorVideoStatus(room);
log.info("roomId={} 主播两分钟没有接听, 房间关闭 ",room.getRoomId());
return JobResp.builder().nextCreateJob(nextCreateJob).build(); return JobResp.builder().nextCreateJob(nextCreateJob).build();
} }
return JobResp.builder().nextCreateJob(true).build(); return JobResp.builder().nextCreateJob(true).build();
@@ -172,6 +181,26 @@ public class RoomCheckJobService {
return true; return true;
} }
@Autowired
private UserService userService;
@Autowired
private AnchorService anchorService;
private void updateAnchorVideoStatus(Room room){
try {
// 释放主播的接听状态
UserData receiverUserData = room.getReceiverUserData();
if(receiverUserData != null && receiverUserData.getConnectTime() != null && receiverUserData.getConnectTime() > 0){
userService.updateVideoStatus(receiverUserData.getId(),0);
anchorService.updateVideoStatus(receiverUserData.getId(),0);
}
log.info("roomId={} 主播状态释放完毕",room.getRoomId());
}catch (Exception e){
log.info("roomId={} 主播状态释放失败",room.getRoomId(), e);
}
}