This commit is contained in:
dute7liang
2024-01-14 03:38:50 +08:00
parent 7a6f9eecd3
commit d5deaacfe8
32 changed files with 323 additions and 135 deletions

View File

@@ -1,7 +1,6 @@
package com.ruoyi.cai.chat;
import cn.hutool.core.map.MapUtil;
import com.ruoyi.cai.config.CaiProperties;
import com.ruoyi.cai.domain.Account;
import com.ruoyi.cai.domain.Anchor;
import com.ruoyi.cai.domain.User;
import com.ruoyi.cai.domain.UserCall;
@@ -12,10 +11,7 @@ import com.ruoyi.cai.dto.app.vo.chat.GetRoomResp;
import com.ruoyi.cai.enums.SystemConfigEnum;
import com.ruoyi.cai.manager.IdManager;
import com.ruoyi.cai.manager.SystemConfigManager;
import com.ruoyi.cai.service.AnchorService;
import com.ruoyi.cai.service.GuardTotalService;
import com.ruoyi.cai.service.UserCallService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.service.*;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.dto.WsToken;
@@ -23,15 +19,16 @@ import com.ruoyi.cai.ws.manager.WebSocketManager;
import com.ruoyi.cai.ws.util.MapGetUtil;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.common.helper.LoginHelper;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -48,11 +45,11 @@ public class ChatManager {
@Autowired
private SystemConfigManager systemConfigManager;
@Autowired
private CaiProperties properties;
@Autowired
private GuardTotalService guardTotalService;
@Autowired
private StringRedisTemplate redisTemplate;
private RedissonClient redissonClient;
@Autowired
private AccountService accountService;
private String setWsToken(Long roomId,Long fromUid,Long toUid,Long userId){
String token = IdManager.nextIdStr();
@@ -62,14 +59,16 @@ public class ChatManager {
map.put("fromUid",fromUid);
map.put("toUid",toUid);
map.put("userId",userId);
redisTemplate.opsForHash().putAll(tokenKey,map);
redisTemplate.expire(tokenKey,1, TimeUnit.DAYS);
RMap<Object, Object> mapRedis = redissonClient.getMap(tokenKey);
mapRedis.putAll(map);
mapRedis.expire(Duration.ofDays(1));
return token;
}
public WsToken getToken(String token){
String tokenKey = String.format(RedisConstant.WS_TOKEN, token);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(tokenKey);
RMap<Object, Object> mapRedis = redissonClient.getMap(tokenKey);
Map<Object, Object> entries = mapRedis.readAllMap();
if(entries.isEmpty()){
return null;
}
@@ -85,6 +84,9 @@ public class ChatManager {
Long userId = LoginHelper.getUserId();
User fromUser = userService.getById(userId);
User toUser = userService.getById(callReq.getToUid());
if(toUser == null){
throw new ServiceException("主播不存在");
}
if(toUser.getIsAnchor() != 1){
throw new ServiceException("对方未通过女神认证,不能接听视频");
}
@@ -95,11 +97,21 @@ public class ChatManager {
if(anchor == null){
throw new ServiceException("主播技能不存在");
}
Account account = accountService.getByUserId(userId);
if(account == null){
throw new ServiceException("账户异常,请联系客服");
}
long coin = account.getIncomeCoin() + account.getCoin();
if(coin < anchor.getPrice()*2){
throw new ServiceException("您的余额不足,请充值");
}
Long roomId = null;
Room room = webSocketManager.checkOnlineRoom(fromUser.getId(), toUser.getId());
if(room == null){
UserCall call = userCallService.createCall(fromUser, toUser, anchor);
roomId = webSocketManager.createRoom(call.getId());
}else{
roomId = room.getRoomId();
}
String wsSocketUrlFormat = systemConfigManager.getSystemConfig(SystemConfigEnum.WS_SOCKET_URL);
String token = setWsToken(roomId, fromUser.getId(), toUser.getId(),userId);

View File

@@ -0,0 +1,29 @@
package com.ruoyi.cai.controller.admin;
import com.ruoyi.cai.mq.AmqpProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/cai/mq/test")
public class MqControllerTest {
@Autowired
private AmqpProducer amqpProducer;
@GetMapping("/send")
public void send(String message,Integer time){
amqpProducer.sendCheckTimeOut(message,time);
}
@GetMapping("/send2")
public void send(String message){
amqpProducer.sendCalculateSales(message);
}
}

View File

@@ -48,7 +48,6 @@ public class UserCall implements Serializable {
* 结束通话时间
*/
private LocalDateTime endTime;
private String skillName;
/**
* 通话时长
*/
@@ -87,6 +86,8 @@ public class UserCall implements Serializable {
@Deprecated
private Integer freeNum;
private Long traceId;
private LocalDateTime createTime;
}

View File

@@ -28,7 +28,7 @@ public enum SystemConfigEnum {
SMS_CODE_ADMIN("", "万能验证码",SystemConfigGroupEnum.SYSTEM),
PASSWORD_ADMIN("", "公用密码",SystemConfigGroupEnum.SYSTEM),
SYSTEM_CUSTOMER_SERVICE("", "系统客服",SystemConfigGroupEnum.SYSTEM),
WS_SOCKET_URL("ws://localhost:8080?token=%s&room_id=%s", "ws通讯地址",SystemConfigGroupEnum.SYSTEM),
WS_SOCKET_URL("ws://localhost:8080/ws?token=%s&room_id=%s", "ws通讯地址",SystemConfigGroupEnum.SYSTEM),
;

View File

@@ -29,8 +29,8 @@ public class AmqpProducer {
CommonDelayDto dto = new CommonDelayDto();
dto.setType(type);
dto.setRoomId(roomId);
rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME,
CheckTimeOutMqConfig.ROUTING_KEY,
rabbitTemplate.convertAndSend(CommonDelayMqConfig.EXCHANGE_NAME,
CommonDelayMqConfig.ROUTING_KEY,
JSON.toJSONString(dto),
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒

View File

@@ -34,7 +34,7 @@ public class CheckTimeOutMqConfig {
@Bean
public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
}
}

View File

@@ -15,7 +15,7 @@ public class CommonDelayMqConfig {
public static final String ROUTING_KEY = "commonDelayRouting";
@Bean
public CustomExchange delayedExchange() {
public CustomExchange commonDelayedExchange() {
HashMap<String,Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME,
@@ -26,15 +26,15 @@ public class CommonDelayMqConfig {
}
@Bean
public Queue delayedQueue() {
public Queue commonDelayedQueue() {
return QueueBuilder.durable(QUEUE_NAME)
.withArgument("x-delayed-type", "direct")
.build();
}
@Bean
public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
public Binding commonDelayedBinding(Queue commonDelayedQueue,CustomExchange commonDelayedExchange) {
return BindingBuilder.bind(commonDelayedQueue()).to(commonDelayedExchange()).with(ROUTING_KEY).noargs();
}
}

View File

@@ -20,11 +20,16 @@ public class CheckTimeOutMqConsumer {
@RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME
,containerFactory = "customContainerFactory")
public void checkTimeOutMq(String message) {
log.info("checkTimeOutMq: " + message);
boolean next = settleService.withholdingFee(Long.valueOf(message));
if(next){
// 1分钟后继续执行
amqpProducer.sendCheckTimeOut(message,60);
log.info("开始执行预扣费: " + message);
try {
boolean next = settleService.withholdingFee(Long.valueOf(message));
if(next){
// 1分钟后继续执行
amqpProducer.sendCheckTimeOut(message,60);
}
}catch (Exception e){
log.error("每分钟定时扣费失败!",e);
}
}
}

View File

@@ -282,7 +282,7 @@ public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> impl
Long callPrice = roomData.getCallPrice();
Long callTime = roomService.getCallTime(room);
// 本次支付金额
long totalAmount = callPrice * (callTime / 60); // 本次需要支付的金额
long totalAmount = callPrice * ((callTime / 60) + 1); // 本次需要支付的金额
Long payCoin = roomData.getPayCoin(); // 已经支付的余额
Long payIncome = roomData.getPayIncome(); // 已经支付的收益
// 补差价
@@ -357,9 +357,10 @@ public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> impl
consumeLogService.save(consumeLog);
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId, userCall.getId())
.set(UserCall::getTraceId,tractId)
.set(UserCall::getBeginTime, DateUtil.date(roomData.getBeginTime()).toLocalDateTime())
.set(UserCall::getEndTime,DateUtil.date(roomData.getHangUpTime()).toLocalDateTime())
.set(UserCall::getCallTime,roomData.getHangUpTime() - roomData.getBeginTime())
.set(UserCall::getEndTime,DateUtil.date(roomData.getHangupTime()).toLocalDateTime())
.set(UserCall::getCallTime,roomData.getHangupTime() - roomData.getBeginTime())
.set(UserCall::getCallAmount,amountReal)
.set(UserCall::getCallIncome, anchorAmount)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_HANGUP.getCode()));
@@ -367,6 +368,7 @@ public class AccountServiceImpl extends ServiceImpl<AccountMapper, Account> impl
resp.setConsumeLog(consumeLog);
resp.setPayIncome(payIncome);
resp.setPayCoin(payCoin);
resp.setAnchorIncome(anchorAmount);
return resp;
}

View File

@@ -47,7 +47,9 @@ public class UserCallServiceImpl extends ServiceImpl<UserCallMapper, UserCall> i
}
UserCall userCall = new UserCall();
userCall.setFromUid(fromUid);
userCall.setFromUsercode(fromUser.getUsercode());
userCall.setToUid(toUser.getId());
userCall.setToUsercode(toUser.getUsercode());
userCall.setCallPrice(anchor.getPrice());
userCall.setReceiverVideoDivide(anchor.getVideoRate());
boolean save = this.save(userCall);

View File

@@ -13,6 +13,7 @@ import com.ruoyi.cai.dto.app.vo.user.UserListVo;
import com.ruoyi.cai.dto.app.vo.user.OnlineStatusVo;
import com.ruoyi.cai.mapper.UserMapper;
import com.ruoyi.cai.service.*;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.common.core.domain.PageQuery;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.common.helper.LoginHelper;
@@ -61,6 +62,8 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
private UserForbidService userForbidService;
@Autowired
private AnchorApplyService anchorApplyService;
@Autowired
private RoomService roomService;
@Override
public User getByUsername(String username) {
@@ -114,7 +117,9 @@ public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements Us
// 在线状态
OnlineStatusVo onlineStatusVo;
if(user.getIsAnchor() == 1 && anchor != null){
onlineStatusVo = userOnlineService.onlineStatus(userId,anchor.getOpenVideoStatus(),anchor.getVideoStatus(), user.getIsAnchor());
boolean runningVideo = roomService.checkRunningVideo(userId, anchor.getUserId());
onlineStatusVo = userOnlineService.onlineStatus(userId,anchor.getOpenVideoStatus(),
runningVideo?1:anchor.getVideoStatus(), user.getIsAnchor());
}else{
onlineStatusVo = userOnlineService.onlineStatus(userId,0,0, user.getIsAnchor());
}

View File

@@ -14,7 +14,7 @@ public class RoomData {
private BigDecimal videoDivide;
private Long payCoin = 0L;
private Long payIncome = 0L;
private Long hangUpTime; // 结束时间
private Long hangupTime; // 结束时间
private Long settleTime; // 结算时间

View File

@@ -2,19 +2,23 @@ package com.ruoyi.cai.ws.cache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import org.apache.commons.lang3.BooleanUtils;
import org.redisson.api.RMap;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
public class CallerRoomCache {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
public String getKey(Long fromUserId){
return String.format(RedisConstant.CALLER_ROOM_DATA,fromUserId);
@@ -22,36 +26,46 @@ public class CallerRoomCache {
public Long getRoomId(Long fromUserId, Long toUserId){
String key = getKey(fromUserId);
Object roomId = redisTemplate.opsForHash().get(key, toUserId);
RMap<String, Object> map = redissonClient.getMap(key);
Object roomId = map.get(toUserId+"");
return roomId == null ? null : Long.valueOf(roomId.toString());
}
public void addRoom(Long fromUid, Long toUid, Long roomId) {
String key = getKey(fromUid);
redisTemplate.opsForHash().put(key,toUid,roomId);
redisTemplate.expire(key,7, TimeUnit.DAYS);
RMap<String, Object> map = redissonClient.getMap(key);
map.put(toUid+"",roomId+"");
map.expire(Duration.ofDays(7));
// map.expire(7,TimeUnit.DAYS);
// redisTemplate.opsForHash().put(key,toUid,roomId);
// redisTemplate.expire(key,7, TimeUnit.DAYS);
}
public Map<Object,Object> getAll(Long fromUid){
String key = getKey(fromUid);
return redisTemplate.opsForHash().entries(key);
RMap<Object, Object> map = redissonClient.getMap(key);
return map.readAllMap();
// return redisTemplate.opsForHash().entries(key);
}
public void del(Long fromUid) {
String key = getKey(fromUid);
redisTemplate.delete(key);
redissonClient.getMap(key).delete();
}
private final static String DEL_ROOM_LUA = "return KEYS[1]\n" +
private final static String DEL_ROOM_LUA =
"local r = tonumber(redis.call('hget',KEYS[1],ARGV[1]))\n" +
"if r == tonumber(ARGV[2]) then\n" +
" return redis.call('hdel',KEYS[1],ARGV[1])\n" +
"end\n" +
"return 0";
public boolean delRoom(Long receiverId, Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(DEL_ROOM_LUA,Boolean.class);
Boolean execute = redisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), receiverId,roomId);
public boolean delRoom(Long callerId, Long receiverId, Long roomId) {
RScript script = redissonClient.getScript();
Boolean execute = script.eval(RScript.Mode.READ_WRITE, DEL_ROOM_LUA, RScript.ReturnType.BOOLEAN,
Collections.singletonList(getKey(callerId)), receiverId+"", roomId+"");
// DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(DEL_ROOM_LUA,Boolean.class);
// Boolean execute = redisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), receiverId,roomId);
return BooleanUtils.isTrue(execute);
}
}

View File

@@ -1,12 +1,10 @@
package com.ruoyi.cai.ws.cache;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.SearchStrategy;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@@ -21,24 +19,21 @@ import java.util.concurrent.TimeUnit;
@Component
public class FdCtxDataCache {
@Autowired
private StringRedisTemplate redisTemplate;
private RedissonClient redissonClient;
public String getKey(String sessionKey){
return String.format(RedisConstant.FDCTX_DATA,sessionKey);
}
public void save(FdCtxData fdCtxData) {
String str = JSON.toJSONString(fdCtxData);
redisTemplate.opsForValue().set(getKey(fdCtxData.getSessionKey()),str,5, TimeUnit.DAYS);
RBucket<FdCtxData> bucket = redissonClient.getBucket(getKey(fdCtxData.getSessionKey()));
bucket.set(fdCtxData,5,TimeUnit.DAYS);
}
public FdCtxData getByRoomId(String sessionKey){
String key = getKey(sessionKey);
String s = redisTemplate.opsForValue().get(key);
if(StringUtils.isEmpty(s)){
return null;
}
return JSON.parseObject(s,FdCtxData.class);
RBucket<FdCtxData> bucket = redissonClient.getBucket(key);
return bucket.get();
}
}

View File

@@ -1,6 +1,8 @@
package com.ruoyi.cai.ws.cache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
@@ -12,20 +14,28 @@ import java.util.Set;
public class OnlineDataCache {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
public String getKey(){
return RedisConstant.ONLINE_ROOM_DATA;
}
public Set<String> getAll(){
return redisTemplate.opsForSet().members(getKey());
RSet<String> set = redissonClient.getSet(getKey());
return set.readAll();
// return redisTemplate.opsForSet().members(getKey());
}
public void add(Long roomId){
redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId));
RSet<String> set = redissonClient.getSet(getKey());
set.add(String.valueOf(roomId));
// redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId));
}
public void remove(Long roomId) {
redisTemplate.opsForSet().remove(getKey(),roomId);
RSet<String> set = redissonClient.getSet(getKey());
set.delete();
// redisTemplate.opsForSet().remove(getKey(),roomId);
}
}

View File

@@ -3,13 +3,18 @@ package com.ruoyi.cai.ws.cache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.cai.ws.holder.WebSocketSessionHolder;
import org.redisson.api.RMap;
import org.redisson.api.RSet;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -22,6 +27,8 @@ import java.util.concurrent.TimeUnit;
public class RoomCtxCache {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
public String getKey(Long roomId){
return String.format(RedisConstant.FDCTX_ROOM_DATA,roomId);
@@ -29,23 +36,28 @@ public class RoomCtxCache {
public void addFd(String sessionKey,Long roomId,Integer userType){
String key = getKey(roomId);
redisTemplate.opsForHash().put(key,sessionKey,userType);
redisTemplate.expire(key,7, TimeUnit.DAYS);
RMap<Object, Object> map = redissonClient.getMap(key);
map.put(sessionKey,userType);
map.expire(Duration.ofDays(7));
}
public List<String> getSessionKeysByRoomId(Long roomId){
String key = getKey(roomId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
List<String> res = new ArrayList<>();
for (Object o : entries.keySet()) {
res.add(String.valueOf(o));
RMap<String, Object> entries = redissonClient.getMap(key);
Set<String> set = entries.readAllKeySet();
List<String> keys = new ArrayList<>();
for (String sessionKey : set) {
if(WebSocketSessionHolder.existSession(sessionKey)){
keys.add(sessionKey);
}
}
return res;
return keys;
}
public String getSessionKeyByRoomIdAndUserType(Long roomId,Integer userType){
String key = getKey(roomId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
RMap<Object, Object> map = redissonClient.getMap(key);
Map<Object, Object> entries = map.readAllMap();
for (Map.Entry<Object, Object> entry : entries.entrySet()) {
String sessionKey = String.valueOf(entry.getKey());
Integer userTypeK = Integer.valueOf(entry.getValue().toString());

View File

@@ -6,12 +6,16 @@ import com.ruoyi.cai.ws.bean.RoomData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import org.apache.commons.lang3.BooleanUtils;
import org.redisson.api.RMap;
import org.redisson.api.RScript;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
@@ -26,6 +30,8 @@ public class RoomDataCache {
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;
public String getKey(Long roomId){
@@ -33,82 +39,105 @@ public class RoomDataCache {
}
public RoomData getByRoomId(Long roomId){
Map<Object, Object> map = stringRedisTemplate.opsForHash().entries(getKey(roomId));
if(map.get("roomId") == null){
RMap<String, Object> map = redissonClient.getMap(getKey(roomId));
Map<String, Object> allMap = map.readAllMap();
// Map<Object, Object> map = redisTemplate.opsForHash().entries(getKey(roomId));
if(allMap.get("roomId") == null){
return null;
}
return JSON.parseObject(JSON.toJSONString(map),RoomData.class);
return JSON.parseObject(JSON.toJSONString(allMap),RoomData.class);
}
public void init(RoomData roomData) {
Map<String,Object> map = JSON.parseObject(JSON.toJSONString(roomData));
stringRedisTemplate.opsForHash().putAll(getKey(roomData.getRoomId()),map);
// redisTemplate.opsForHash().putAll(getKey(roomData.getRoomId()),map);
RMap<String, Object> mapRedis = redissonClient.getMap(getKey(roomData.getRoomId()));
mapRedis.putAll(map);
mapRedis.expire(Duration.ofDays(15));
}
public void hMSet(Long roomId,Map<String,Object> data) {
String key = getKey(roomId);
stringRedisTemplate.opsForHash().putAll(key,data);
// redisTemplate.opsForHash().putAll(key,data);
RMap<Object, Object> map = redissonClient.getMap(key);
map.putAll(data);
}
public void hMSet(Long roomId,String mKey,Object mData) {
String key = getKey(roomId);
stringRedisTemplate.opsForHash().put(key,mKey,mData);
RMap<String, Object> map = redissonClient.getMap(key);
map.put(mKey,mData);
// redisTemplate.opsForHash().put(key,mKey,mData);
}
private final static String HANG_UP = "local hangupTime = tonumber(redis.call('hGet', KEYS[1], 'hangupTime'))\n" +
"if hangupTime > 0 then\n" +
private final static String HANG_UP = "local hangupTime = tonumber(redis.call('hGet', KEYS[1], '\"hangupTime\"'))\n" +
"if hangupTime and hangupTime > 0 then\n" +
" return 0\n" +
"end\n" +
"return redis.call('hMSet', KEYS[1], 'status', 8, 'hangupTime', ARGV[1])";
"return redis.call('hMSet', KEYS[1], '\"status\"', 8, '\"hangupTime\"', ARGV[1])";
public boolean hangUp(Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(HANG_UP,Boolean.class);
String currentTime = String.valueOf(System.currentTimeMillis() / 1000);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime);
RScript script = redissonClient.getScript();
Long currentTime = System.currentTimeMillis() / 1000;
Boolean execute = script.eval(RScript.Mode.READ_WRITE, HANG_UP, RScript.ReturnType.BOOLEAN,
Collections.singletonList(getKey(roomId)), currentTime);
// DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(HANG_UP,Boolean.class);
// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime);
return BooleanUtils.isTrue(execute);
}
public void setStatus(Long roomId, RoomStatusEnums status) {
String key = getKey(roomId);
stringRedisTemplate.opsForHash().put(key,"status",status.getCode());
// redisTemplate.opsForHash().put(key,"status",status.getCode());
RMap<String, Object> map = redissonClient.getMap(key);
map.put("status",status.getCode());
}
private final static String SET_STATUS_RECEIVER_CONNECTION = "local status = tonumber(redis.call('hget', KEYS[1], 'status'))\n" +
private final static String SET_STATUS_RECEIVER_CONNECTION = "local status = tonumber(redis.call('hget', KEYS[1], '\"status\"'))\n" +
"if status ~= 1 then\n" +
" return 0\n" +
"end\n" +
"return redis.call('hmset', KEYS[1], 'status', 3)";
"return redis.call('hmset', KEYS[1], '\"status\"', 3)";
public boolean setStatusReceiverConnection(Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_RECEIVER_CONNECTION,Boolean.class);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)));
RScript script = redissonClient.getScript();
Boolean execute = script.eval(RScript.Mode.READ_WRITE, SET_STATUS_RECEIVER_CONNECTION, RScript.ReturnType.BOOLEAN,
Collections.singletonList(getKey(roomId)));
// DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_RECEIVER_CONNECTION,Boolean.class);
// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)));
return BooleanUtils.isTrue(execute);
}
private final static String SET_STATUS_AGREE =
"local status = tonumber(redis.call('hget', KEYS[1], 'status'))\n" +
"local beginTime = tonumber(redis.call('hget', KEYS[1], 'beginTime'))\n" +
"if status ~= 3 or beginTime > 0 then\n" +
"local status = tonumber(redis.call('hget', KEYS[1], '\"status\"'))\n" +
"local beginTime = tonumber(redis.call('hget', KEYS[1], '\"beginTime\"'))\n" +
"if status ~= 3 or (beginTime and beginTime > 0) then\n" +
" return 0\n" +
"end\n" +
"return redis.call('hmset', KEYS[1], 'status', 7, 'beginTime', ARGV[1])";
"return redis.call('hmset', KEYS[1], '\"status\"', 7, '\"beginTime\"', ARGV[1])";
public boolean setStatusAgree(Long roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_AGREE,Boolean.class);
String currentTime = String.valueOf(System.currentTimeMillis() / 1000);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime);
RScript script = redissonClient.getScript();
Boolean execute = script.eval(RScript.Mode.READ_WRITE, SET_STATUS_AGREE, RScript.ReturnType.BOOLEAN,
Collections.singletonList(getKey(roomId)),currentTime);
// DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_AGREE,Boolean.class);
// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime);
return BooleanUtils.isTrue(execute);
}
private final static String INCS_BLACK_AMOUNT =
"redis.call('hIncrBy', KEYS[1], 'payCoin', ARGV[1])\n" +
"redis.call('hIncrBy', KEYS[1], 'payIncome', ARGV[2])";
"redis.call('hIncrBy', KEYS[1], '\"payCoin\"', ARGV[1])\n" +
"redis.call('hIncrBy', KEYS[1], '\"payIncome\"', ARGV[2])";
public boolean incsBlackAmount(Long roomId, Long decrCoin, Long decrIncomeCoin) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(INCS_BLACK_AMOUNT,Boolean.class);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),
decrCoin,decrIncomeCoin);
RScript script = redissonClient.getScript();
Boolean execute = script.eval(RScript.Mode.READ_WRITE, INCS_BLACK_AMOUNT, RScript.ReturnType.BOOLEAN,
Collections.singletonList(getKey(roomId)),decrCoin.intValue(),decrIncomeCoin.intValue());
// DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(INCS_BLACK_AMOUNT,Boolean.class);
// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),
// decrCoin,decrIncomeCoin);
return BooleanUtils.isTrue(execute);
}
}

View File

@@ -4,10 +4,13 @@ import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Map;
/**
@@ -19,6 +22,8 @@ import java.util.Map;
public class UserDataCache {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
public String getKey(Long roomId,int type){
return String.format(RedisConstant.USER_ROOM_DATA,roomId,type== UserDataConstant.TYPE_CALLER?"caller":"receiver");
@@ -34,18 +39,23 @@ public class UserDataCache {
public UserData getUserDataByRoom(Long roomId,int type){
String key = getKey(roomId, type);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
if(entries.get("roomId") == null){
// Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
RMap<Object, Object> entries = redissonClient.getMap(key);
Map<Object, Object> map = entries.readAllMap();
if(map.get("roomId") == null){
return null;
}
return JSON.parseObject(JSON.toJSONString(entries),UserData.class);
return JSON.parseObject(JSON.toJSONString(map),UserData.class);
}
public void init(UserData userData,int type){
String key = getKey(userData.getRoomId(), type);
userData.setUserType(type);
Map<String,Object> map = JSON.parseObject(JSON.toJSONString(userData));
redisTemplate.opsForHash().putAll(key,map);
// redisTemplate.opsForHash().putAll(key,map);
RMap<Object, Object> mapRedis = redissonClient.getMap(key);
mapRedis.putAll(map);
mapRedis.expire(Duration.ofDays(15));
}
public void initCaller(UserData callerUserData) {
@@ -58,6 +68,7 @@ public class UserDataCache {
public void hMSet(Long roomId,Integer userType,Map<String,Object> data) {
String key = getKey(roomId, userType);
redisTemplate.opsForHash().putAll(key,data);
// redisTemplate.opsForHash().putAll(key,data);
redissonClient.getMap(key).putAll(data);
}
}

View File

@@ -53,8 +53,8 @@ public class WsRMsgGen {
public static WsR hangup(String message, Long roomId, Integer hangUpType) {
Map<String,Object> map = new HashMap<>();
map.put("roomid","roomId");
map.put("type","hangUpType");
map.put("roomid",roomId);
map.put("type",hangUpType);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("hangup");
ok.setMsg(message);

View File

@@ -10,8 +10,8 @@ import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.dto.WsToken;
import com.ruoyi.cai.ws.service.CheckConnectionDTO;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.util.MapGetUtil;
import com.ruoyi.cai.ws.util.WsExceptionUtil;
import jodd.util.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
@@ -36,6 +36,9 @@ public class MessageHandleApplication {
public void processOn(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
if(StringUtil.isEmpty(payload)){
return;
}
JSONObject jsonObject = JSON.parseObject(payload);
Object method = jsonObject.get("method");
if(method == null){

View File

@@ -1,11 +1,17 @@
package com.ruoyi.cai.ws.handler;
import cn.hutool.core.util.URLUtil;
import com.google.common.base.Splitter;
import com.ruoyi.cai.chat.ChatManager;
import com.ruoyi.cai.ws.cache.FdCtxDataCache;
import com.ruoyi.cai.ws.constant.WebSocketConstants;
import com.ruoyi.cai.ws.holder.WebSocketSessionHolder;
import com.ruoyi.cai.ws.processon.OpenLogic;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.util.WebSocketUtils;
import com.ruoyi.common.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URIUtils;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -14,6 +20,9 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import javax.websocket.server.PathParam;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
@@ -34,10 +43,27 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) {
URI uri = session.getUri();
Map<String, String> para = getPara(uri == null ? null : uri.getQuery());
Map<String, Object> attributes = session.getAttributes();
attributes.putAll(para);
openLogic.processOn(session);
log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId());
}
private Map<String,String> getPara(String uri) {
Map<String,String> map = new HashMap<>();
if(StringUtils.isEmpty(uri)){
return map;
}
String[] keys = uri.split("&");
for (String key : keys) {
String[] split = key.split("=");
map.put(split[0],split.length > 1 ? split[1] : "");
}
return map;
}
/**
* 处理发送来的文本消息
*
@@ -47,7 +73,11 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
messageHandleApplication.processOn(session,message);
try {
messageHandleApplication.processOn(session,message);
}catch (Exception e){
log.error("ws消息处理失败需要开发检查问题!",e);
}
}
@Override
@@ -79,6 +109,10 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
}
@Autowired
private RoomService roomService;
@Autowired
private FdCtxDataCache fdCtxDataCache;
/**
* 连接关闭后
*
@@ -88,6 +122,7 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String token = String.valueOf(session.getAttributes().get("token"));
// TODO 连接关闭 是否要删除fd的关系
WebSocketSessionHolder.removeSession(token);
log.info("[disconnect] sessionId: {},token:{}", session.getId(), token);
}

View File

@@ -37,7 +37,7 @@ public class HangupMessageHandler extends AbstractMessageHandle implements IMess
return;
}
// 触发结算
settleService.processOn(room);
settleService.processOn(room.getRoomId());
Integer type = fdCtxData.isCaller() ? HangUpEnums.FROM.getCode() : HangUpEnums.TO.getCode();
sendToCurrent(fdCtxData,WsRMsgGen.hangup("您已挂断",room.getRoomId(), type));
sendToTar(fdCtxData,WsRMsgGen.hangup("对方已挂断",room.getRoomId(), type));

View File

@@ -9,18 +9,18 @@ import com.ruoyi.cai.trd.ImMsgGen;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.cache.RoomCtxCache;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.service.SettleService;
import com.ruoyi.yunxin.Yunxin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class CheckTimeOutJob {
@Autowired
private OnlineDataCache onlineDataCache;
@@ -29,13 +29,14 @@ public class CheckTimeOutJob {
@Autowired
private UserCallService userCallService;
@Autowired
private RoomCtxCache roomCtxCache;
private SettleService settleService;
@Autowired
private Yunxin yunxin;
/**
* 1 分钟执行一次
* 检查房间是不是三分钟没有接听, 需要自动挂断掉
*/
@Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> roomIdStr = onlineDataCache.getAll();
for (String roomIdS : roomIdStr) {
@@ -45,7 +46,11 @@ public class CheckTimeOutJob {
}
public void deal(Long roomId){
log.info("开始执行房间超时检测,是否一直不接 roomId={}",roomId);
Room room = roomService.load(roomId);
if(room == null){
return;
}
if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus())
&& !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){
return;
@@ -66,6 +71,7 @@ public class CheckTimeOutJob {
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
// TODO 需要放开主播的接听状态
roomService.closeAllFd(roomId);
ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0);
yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes);

View File

@@ -41,7 +41,7 @@ public class HeartbeatJob {
/**
* 30秒执行一次
* 30秒执行一次 心跳只处理接听后的心跳
*/
@Scheduled(fixedDelay = 30,timeUnit = TimeUnit.SECONDS)
public void run(){
@@ -50,6 +50,9 @@ public class HeartbeatJob {
try {
Long roomId = Long.valueOf(roomIdS);
Room room = roomService.load(roomId);
if(room == null){
return;
}
if(!room.isCanCall()){
return;
}
@@ -69,7 +72,7 @@ public class HeartbeatJob {
userCallService.update(Wrappers.lambdaUpdate(UserCall.class)
.eq(UserCall::getId,roomId)
.set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode()));
settleService.processOn(room);
settleService.processOn(roomId);
}
List<String> keys = roomCtxCache.getSessionKeysByRoomId(roomId);
RoomWebSocketUtil.sendSendMessage(keys, hangup);

View File

@@ -25,17 +25,13 @@ public class SettleJob {
/**
* 每 1 分钟执行一次
*/
@Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS)
public void run(){
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
Room room = roomService.load(roomId);
if(room == null){
return;
}
settleService.processOn(room);
settleService.processOn(roomId);
}catch (Exception e){
log.info("定时任务结算失败!",e);
}

View File

@@ -48,7 +48,7 @@ public class WebSocketManager {
return null;
}
Room room = roomService.load(roomId);
if(room == null){
if(room == null || !room.isCanCall()){
return null;
}
return room;
@@ -104,7 +104,6 @@ public class WebSocketManager {
RoomData roomData = new RoomData();
roomData.setRoomId(call.getId());
roomData.setCallPrice(call.getCallPrice());
roomData.setSkillName(call.getSkillName());
roomData.setStatus(call.getStatus());
roomData.setVideoDivide(call.getReceiverVideoDivide());
roomDataCache.init(roomData);
@@ -119,7 +118,7 @@ public class WebSocketManager {
receiveUserData.setRoomId(call.getId());
receiveUserData.setNickname(receiverUser.getNickname());
receiveUserData.setUserCode(receiverUser.getUsercode());
userDataCache.initReceiver(callerUserData);
userDataCache.initReceiver(receiveUserData);
callerRoomCache.addRoom(call.getFromUid(),call.getToUid(),call.getId());
return call.getId();

View File

@@ -73,7 +73,7 @@ public class OpenLogic {
Long roomId = wsToken.getRoomId();
Long userId = wsToken.getUserId();
Room room = roomService.load(roomId);
if(room == null || (room.getCallUserData().getId().equals(userId) && room.getReceiverUserData().getId().equals(userId))){
if(room == null || (!room.getCallUserData().getId().equals(userId) && !room.getReceiverUserData().getId().equals(userId))){
WsExceptionUtil.throwException(session,"房间不可用", HangUpEnums.OTHER,roomId);
return;
}

View File

@@ -138,8 +138,8 @@ public class RoomService {
if(beginTime == null){
return 0L;
}
if(roomData.getHangUpTime() != null){
return roomData.getHangUpTime() - roomData.getBeginTime();
if(roomData.getHangupTime() != null){
return roomData.getHangupTime() - roomData.getBeginTime();
}
return DateUtil.currentSeconds() - beginTime;
@@ -180,7 +180,7 @@ public class RoomService {
Long receiverId = receiverUserData.getId();
Long callerId = room.getCallUserData().getId();
// 呼叫方释放资源
callerRoomCache.delRoom(receiverId,roomId);
callerRoomCache.delRoom(callerId,receiverId,roomId);
userService.updateVideoStatus(callerId,0);
// 接收方释放资源(已连接的情况下)
if(receiverUserData.getConnectTime() != null && receiverUserData.getConnectTime() > 0){
@@ -195,11 +195,25 @@ public class RoomService {
}
// 修改释放状态
Map<String,Object> map = new HashMap<>();
map.put("releaseRes",1);
map.put("releaseRes",true);
roomDataCache.hMSet(roomId,map);
}catch (Exception e){
log.error("释放房间资源失败!房间号:{}",roomId,e);
}
}
/**
* 检查是否正在通话中
*/
public boolean checkRunningVideo(Long toUserId, Long fromUserId) {
Long roomId = callerRoomCache.getRoomId(toUserId, fromUserId);
if(roomId != null){
Room room = load(roomId);
if(room != null && room.isCanCall()){
return true;
}
}
return false;
}
}

View File

@@ -76,7 +76,7 @@ public class SettleService {
boolean b = roomService.hangUp(room.getRoomId());
if(b){
// 结算操作
settleService.processOn(room);
settleService.processOn(room.getRoomId());
}
// 向客户端发送挂断指令
WsR r = WsRMsgGen.hangup("拨打方余额不足", room.getRoomId(), HangUpEnums.NOTMONEY.getCode());
@@ -99,10 +99,14 @@ public class SettleService {
/**
* 结算处理
* @param room
*/
@SneakyThrows
public void processOn(Room room){
public void processOn(Long roomId){
Room room = roomService.load(roomId);
if(room == null){
log.warn("房间不存在,无需结算 roomId={}",roomId);
return;
}
String lock = LockManager.getVideoSettleLock(room.getRoomId());
RLock clientLock = redissonClient.getLock(lock);
boolean locked = clientLock.isLocked();
@@ -110,7 +114,7 @@ public class SettleService {
log.info("正在结算中,稍等!");
return;
}
boolean lockFlag = clientLock.tryLock(5, TimeUnit.SECONDS);
boolean lockFlag = clientLock.tryLock();
if(!lockFlag){
log.info("正在结算中,稍等!");
return;
@@ -133,7 +137,6 @@ public class SettleService {
}
if(!room.isReleaseRes()){ // 房间资源是否已经释放
roomService.releaseRes(room.getRoomId());
return;
}
Long roomId = room.getRoomId();
// 未通话,无需结算