From d5deaacfe819e4f0938facedc2a243adc8ff4aab Mon Sep 17 00:00:00 2001 From: dute7liang <383200134@qq.com> Date: Sun, 14 Jan 2024 03:38:50 +0800 Subject: [PATCH] init --- .../main/java/com/ruoyi/RuoYiApplication.java | 2 + .../src/main/resources/application-dev.yml | 2 +- .../src/main/resources/application.yml | 4 +- .../java/com/ruoyi/cai/chat/ChatManager.java | 40 ++++++--- .../controller/admin/MqControllerTest.java | 29 +++++++ .../java/com/ruoyi/cai/domain/UserCall.java | 3 +- .../com/ruoyi/cai/enums/SystemConfigEnum.java | 2 +- .../java/com/ruoyi/cai/mq/AmqpProducer.java | 4 +- .../ruoyi/cai/mq/CheckTimeOutMqConfig.java | 2 +- .../com/ruoyi/cai/mq/CommonDelayMqConfig.java | 8 +- .../mq/consumer/CheckTimeOutMqConsumer.java | 15 ++-- .../cai/service/impl/AccountServiceImpl.java | 8 +- .../cai/service/impl/UserCallServiceImpl.java | 2 + .../cai/service/impl/UserServiceImpl.java | 7 +- .../java/com/ruoyi/cai/ws/bean/RoomData.java | 2 +- .../ruoyi/cai/ws/cache/CallerRoomCache.java | 36 +++++--- .../ruoyi/cai/ws/cache/FdCtxDataCache.java | 19 ++--- .../ruoyi/cai/ws/cache/OnlineDataCache.java | 16 +++- .../com/ruoyi/cai/ws/cache/RoomCtxCache.java | 28 ++++-- .../com/ruoyi/cai/ws/cache/RoomDataCache.java | 85 +++++++++++++------ .../com/ruoyi/cai/ws/cache/UserDataCache.java | 21 +++-- .../java/com/ruoyi/cai/ws/dto/WsRMsgGen.java | 4 +- .../ws/handler/MessageHandleApplication.java | 5 +- .../cai/ws/handler/RoomWebSocketHandler.java | 37 +++++++- .../handler/message/HangupMessageHandler.java | 2 +- .../com/ruoyi/cai/ws/job/CheckTimeOutJob.java | 18 ++-- .../com/ruoyi/cai/ws/job/HeartbeatJob.java | 7 +- .../java/com/ruoyi/cai/ws/job/SettleJob.java | 8 +- .../cai/ws/manager/WebSocketManager.java | 5 +- .../com/ruoyi/cai/ws/processon/OpenLogic.java | 2 +- .../com/ruoyi/cai/ws/service/RoomService.java | 22 ++++- .../ruoyi/cai/ws/service/SettleService.java | 13 +-- 32 files changed, 323 insertions(+), 135 deletions(-) create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/MqControllerTest.java diff --git a/ruoyi-admin/src/main/java/com/ruoyi/RuoYiApplication.java b/ruoyi-admin/src/main/java/com/ruoyi/RuoYiApplication.java index b52d32d6..fa121ccc 100644 --- a/ruoyi-admin/src/main/java/com/ruoyi/RuoYiApplication.java +++ b/ruoyi-admin/src/main/java/com/ruoyi/RuoYiApplication.java @@ -3,6 +3,7 @@ package com.ruoyi; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; +import org.springframework.scheduling.annotation.EnableScheduling; /** * 启动程序 @@ -11,6 +12,7 @@ import org.springframework.boot.context.metrics.buffering.BufferingApplicationSt */ @SpringBootApplication +@EnableScheduling public class RuoYiApplication { public static void main(String[] args) { diff --git a/ruoyi-admin/src/main/resources/application-dev.yml b/ruoyi-admin/src/main/resources/application-dev.yml index 06bcc3ed..eb022768 100644 --- a/ruoyi-admin/src/main/resources/application-dev.yml +++ b/ruoyi-admin/src/main/resources/application-dev.yml @@ -168,7 +168,7 @@ spring: username: admin # 账号 password: 383200134 # 密码 port: 5672 - virtual-host: /cai + virtual-host: /cai-dev agora: app-id: app key: 627b8e17b0c616c1346cba2d87e10251 diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index 53f6f3c1..f2a02168 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -187,7 +187,6 @@ mybatis-plus: updateStrategy: NOT_NULL # 字段验证策略之 select,在 select 的时候的字段验证策略既 wrapper 根据内部 entity 生成的 where 条件 where-strategy: NOT_NULL - # 数据加密 mybatis-encryptor: # 是否开启加密 @@ -233,7 +232,8 @@ thread-pool: queueCapacity: 128 # 线程池维护线程所允许的空闲时间 keepAliveSeconds: 300 - +websocket: + enabled: true --- # 分布式锁 lock4j 全局配置 lock4j: # 获取分布式锁超时时间,默认为 3000 毫秒 diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/chat/ChatManager.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/chat/ChatManager.java index 5c9ca743..5a317685 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/chat/ChatManager.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/chat/ChatManager.java @@ -1,7 +1,6 @@ package com.ruoyi.cai.chat; -import cn.hutool.core.map.MapUtil; -import com.ruoyi.cai.config.CaiProperties; +import com.ruoyi.cai.domain.Account; import com.ruoyi.cai.domain.Anchor; import com.ruoyi.cai.domain.User; import com.ruoyi.cai.domain.UserCall; @@ -12,10 +11,7 @@ import com.ruoyi.cai.dto.app.vo.chat.GetRoomResp; import com.ruoyi.cai.enums.SystemConfigEnum; import com.ruoyi.cai.manager.IdManager; import com.ruoyi.cai.manager.SystemConfigManager; -import com.ruoyi.cai.service.AnchorService; -import com.ruoyi.cai.service.GuardTotalService; -import com.ruoyi.cai.service.UserCallService; -import com.ruoyi.cai.service.UserService; +import com.ruoyi.cai.service.*; import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.constant.RedisConstant; import com.ruoyi.cai.ws.dto.WsToken; @@ -23,15 +19,16 @@ import com.ruoyi.cai.ws.manager.WebSocketManager; import com.ruoyi.cai.ws.util.MapGetUtil; import com.ruoyi.common.exception.ServiceException; import com.ruoyi.common.helper.LoginHelper; +import org.redisson.api.RMap; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -48,11 +45,11 @@ public class ChatManager { @Autowired private SystemConfigManager systemConfigManager; @Autowired - private CaiProperties properties; - @Autowired private GuardTotalService guardTotalService; @Autowired - private StringRedisTemplate redisTemplate; + private RedissonClient redissonClient; + @Autowired + private AccountService accountService; private String setWsToken(Long roomId,Long fromUid,Long toUid,Long userId){ String token = IdManager.nextIdStr(); @@ -62,14 +59,16 @@ public class ChatManager { map.put("fromUid",fromUid); map.put("toUid",toUid); map.put("userId",userId); - redisTemplate.opsForHash().putAll(tokenKey,map); - redisTemplate.expire(tokenKey,1, TimeUnit.DAYS); + RMap mapRedis = redissonClient.getMap(tokenKey); + mapRedis.putAll(map); + mapRedis.expire(Duration.ofDays(1)); return token; } public WsToken getToken(String token){ String tokenKey = String.format(RedisConstant.WS_TOKEN, token); - Map entries = redisTemplate.opsForHash().entries(tokenKey); + RMap mapRedis = redissonClient.getMap(tokenKey); + Map entries = mapRedis.readAllMap(); if(entries.isEmpty()){ return null; } @@ -85,6 +84,9 @@ public class ChatManager { Long userId = LoginHelper.getUserId(); User fromUser = userService.getById(userId); User toUser = userService.getById(callReq.getToUid()); + if(toUser == null){ + throw new ServiceException("主播不存在"); + } if(toUser.getIsAnchor() != 1){ throw new ServiceException("对方未通过女神认证,不能接听视频"); } @@ -95,11 +97,21 @@ public class ChatManager { if(anchor == null){ throw new ServiceException("主播技能不存在"); } + Account account = accountService.getByUserId(userId); + if(account == null){ + throw new ServiceException("账户异常,请联系客服"); + } + long coin = account.getIncomeCoin() + account.getCoin(); + if(coin < anchor.getPrice()*2){ + throw new ServiceException("您的余额不足,请充值"); + } Long roomId = null; Room room = webSocketManager.checkOnlineRoom(fromUser.getId(), toUser.getId()); if(room == null){ UserCall call = userCallService.createCall(fromUser, toUser, anchor); roomId = webSocketManager.createRoom(call.getId()); + }else{ + roomId = room.getRoomId(); } String wsSocketUrlFormat = systemConfigManager.getSystemConfig(SystemConfigEnum.WS_SOCKET_URL); String token = setWsToken(roomId, fromUser.getId(), toUser.getId(),userId); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/MqControllerTest.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/MqControllerTest.java new file mode 100644 index 00000000..abb2795c --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/admin/MqControllerTest.java @@ -0,0 +1,29 @@ +package com.ruoyi.cai.controller.admin; + +import com.ruoyi.cai.mq.AmqpProducer; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Validated +@RequiredArgsConstructor +@RestController +@RequestMapping("/cai/mq/test") +public class MqControllerTest { + + @Autowired + private AmqpProducer amqpProducer; + + @GetMapping("/send") + public void send(String message,Integer time){ + amqpProducer.sendCheckTimeOut(message,time); + } + + @GetMapping("/send2") + public void send(String message){ + amqpProducer.sendCalculateSales(message); + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/domain/UserCall.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/domain/UserCall.java index d9354ef7..7c3d7b30 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/domain/UserCall.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/domain/UserCall.java @@ -48,7 +48,6 @@ public class UserCall implements Serializable { * 结束通话时间 */ private LocalDateTime endTime; - private String skillName; /** * 通话时长 */ @@ -87,6 +86,8 @@ public class UserCall implements Serializable { @Deprecated private Integer freeNum; + private Long traceId; + private LocalDateTime createTime; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/SystemConfigEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/SystemConfigEnum.java index b4953918..1aac4c09 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/SystemConfigEnum.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/SystemConfigEnum.java @@ -28,7 +28,7 @@ public enum SystemConfigEnum { SMS_CODE_ADMIN("", "万能验证码",SystemConfigGroupEnum.SYSTEM), PASSWORD_ADMIN("", "公用密码",SystemConfigGroupEnum.SYSTEM), SYSTEM_CUSTOMER_SERVICE("", "系统客服",SystemConfigGroupEnum.SYSTEM), - WS_SOCKET_URL("ws://localhost:8080?token=%s&room_id=%s", "ws通讯地址",SystemConfigGroupEnum.SYSTEM), + WS_SOCKET_URL("ws://localhost:8080/ws?token=%s&room_id=%s", "ws通讯地址",SystemConfigGroupEnum.SYSTEM), ; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java index 69f0d84e..8d757954 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java @@ -29,8 +29,8 @@ public class AmqpProducer { CommonDelayDto dto = new CommonDelayDto(); dto.setType(type); dto.setRoomId(roomId); - rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME, - CheckTimeOutMqConfig.ROUTING_KEY, + rabbitTemplate.convertAndSend(CommonDelayMqConfig.EXCHANGE_NAME, + CommonDelayMqConfig.ROUTING_KEY, JSON.toJSONString(dto), messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java index 17edd4ae..743154f2 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CheckTimeOutMqConfig.java @@ -34,7 +34,7 @@ public class CheckTimeOutMqConfig { @Bean public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { - return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); + return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs(); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java index 11b9205b..ee9bf4b0 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java @@ -15,7 +15,7 @@ public class CommonDelayMqConfig { public static final String ROUTING_KEY = "commonDelayRouting"; @Bean - public CustomExchange delayedExchange() { + public CustomExchange commonDelayedExchange() { HashMap args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_NAME, @@ -26,15 +26,15 @@ public class CommonDelayMqConfig { } @Bean - public Queue delayedQueue() { + public Queue commonDelayedQueue() { return QueueBuilder.durable(QUEUE_NAME) .withArgument("x-delayed-type", "direct") .build(); } @Bean - public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { - return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); + public Binding commonDelayedBinding(Queue commonDelayedQueue,CustomExchange commonDelayedExchange) { + return BindingBuilder.bind(commonDelayedQueue()).to(commonDelayedExchange()).with(ROUTING_KEY).noargs(); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java index 4e501e81..77097bd7 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java @@ -20,11 +20,16 @@ public class CheckTimeOutMqConsumer { @RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME ,containerFactory = "customContainerFactory") public void checkTimeOutMq(String message) { - log.info("checkTimeOutMq: " + message); - boolean next = settleService.withholdingFee(Long.valueOf(message)); - if(next){ - // 1分钟后继续执行 - amqpProducer.sendCheckTimeOut(message,60); + log.info("开始执行预扣费: " + message); + try { + boolean next = settleService.withholdingFee(Long.valueOf(message)); + if(next){ + // 1分钟后继续执行 + amqpProducer.sendCheckTimeOut(message,60); + } + }catch (Exception e){ + log.error("每分钟定时扣费失败!",e); } + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java index 5493c7ef..4d96bf80 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java @@ -282,7 +282,7 @@ public class AccountServiceImpl extends ServiceImpl impl Long callPrice = roomData.getCallPrice(); Long callTime = roomService.getCallTime(room); // 本次支付金额 - long totalAmount = callPrice * (callTime / 60); // 本次需要支付的金额 + long totalAmount = callPrice * ((callTime / 60) + 1); // 本次需要支付的金额 Long payCoin = roomData.getPayCoin(); // 已经支付的余额 Long payIncome = roomData.getPayIncome(); // 已经支付的收益 // 补差价 @@ -357,9 +357,10 @@ public class AccountServiceImpl extends ServiceImpl impl consumeLogService.save(consumeLog); userCallService.update(Wrappers.lambdaUpdate(UserCall.class) .eq(UserCall::getId, userCall.getId()) + .set(UserCall::getTraceId,tractId) .set(UserCall::getBeginTime, DateUtil.date(roomData.getBeginTime()).toLocalDateTime()) - .set(UserCall::getEndTime,DateUtil.date(roomData.getHangUpTime()).toLocalDateTime()) - .set(UserCall::getCallTime,roomData.getHangUpTime() - roomData.getBeginTime()) + .set(UserCall::getEndTime,DateUtil.date(roomData.getHangupTime()).toLocalDateTime()) + .set(UserCall::getCallTime,roomData.getHangupTime() - roomData.getBeginTime()) .set(UserCall::getCallAmount,amountReal) .set(UserCall::getCallIncome, anchorAmount) .set(UserCall::getStatus, RoomStatusEnums.STATUS_HANGUP.getCode())); @@ -367,6 +368,7 @@ public class AccountServiceImpl extends ServiceImpl impl resp.setConsumeLog(consumeLog); resp.setPayIncome(payIncome); resp.setPayCoin(payCoin); + resp.setAnchorIncome(anchorAmount); return resp; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserCallServiceImpl.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserCallServiceImpl.java index 24abf0b3..632be0dc 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserCallServiceImpl.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserCallServiceImpl.java @@ -47,7 +47,9 @@ public class UserCallServiceImpl extends ServiceImpl i } UserCall userCall = new UserCall(); userCall.setFromUid(fromUid); + userCall.setFromUsercode(fromUser.getUsercode()); userCall.setToUid(toUser.getId()); + userCall.setToUsercode(toUser.getUsercode()); userCall.setCallPrice(anchor.getPrice()); userCall.setReceiverVideoDivide(anchor.getVideoRate()); boolean save = this.save(userCall); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserServiceImpl.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserServiceImpl.java index 1a58690a..5efae200 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserServiceImpl.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserServiceImpl.java @@ -13,6 +13,7 @@ import com.ruoyi.cai.dto.app.vo.user.UserListVo; import com.ruoyi.cai.dto.app.vo.user.OnlineStatusVo; import com.ruoyi.cai.mapper.UserMapper; import com.ruoyi.cai.service.*; +import com.ruoyi.cai.ws.service.RoomService; import com.ruoyi.common.core.domain.PageQuery; import com.ruoyi.common.exception.ServiceException; import com.ruoyi.common.helper.LoginHelper; @@ -61,6 +62,8 @@ public class UserServiceImpl extends ServiceImpl implements Us private UserForbidService userForbidService; @Autowired private AnchorApplyService anchorApplyService; + @Autowired + private RoomService roomService; @Override public User getByUsername(String username) { @@ -114,7 +117,9 @@ public class UserServiceImpl extends ServiceImpl implements Us // 在线状态 OnlineStatusVo onlineStatusVo; if(user.getIsAnchor() == 1 && anchor != null){ - onlineStatusVo = userOnlineService.onlineStatus(userId,anchor.getOpenVideoStatus(),anchor.getVideoStatus(), user.getIsAnchor()); + boolean runningVideo = roomService.checkRunningVideo(userId, anchor.getUserId()); + onlineStatusVo = userOnlineService.onlineStatus(userId,anchor.getOpenVideoStatus(), + runningVideo?1:anchor.getVideoStatus(), user.getIsAnchor()); }else{ onlineStatusVo = userOnlineService.onlineStatus(userId,0,0, user.getIsAnchor()); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java index 0e00820c..06792f5c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java @@ -14,7 +14,7 @@ public class RoomData { private BigDecimal videoDivide; private Long payCoin = 0L; private Long payIncome = 0L; - private Long hangUpTime; // 结束时间 + private Long hangupTime; // 结束时间 private Long settleTime; // 结算时间 diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/CallerRoomCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/CallerRoomCache.java index 65b4ec50..8cc5e173 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/CallerRoomCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/CallerRoomCache.java @@ -2,19 +2,23 @@ package com.ruoyi.cai.ws.cache; import com.ruoyi.cai.ws.constant.RedisConstant; import org.apache.commons.lang3.BooleanUtils; +import org.redisson.api.RMap; +import org.redisson.api.RScript; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.Collections; import java.util.Map; -import java.util.concurrent.TimeUnit; @Component public class CallerRoomCache { @Autowired private StringRedisTemplate redisTemplate; + @Autowired + private RedissonClient redissonClient; public String getKey(Long fromUserId){ return String.format(RedisConstant.CALLER_ROOM_DATA,fromUserId); @@ -22,36 +26,46 @@ public class CallerRoomCache { public Long getRoomId(Long fromUserId, Long toUserId){ String key = getKey(fromUserId); - Object roomId = redisTemplate.opsForHash().get(key, toUserId); + RMap map = redissonClient.getMap(key); + Object roomId = map.get(toUserId+""); return roomId == null ? null : Long.valueOf(roomId.toString()); } public void addRoom(Long fromUid, Long toUid, Long roomId) { String key = getKey(fromUid); - redisTemplate.opsForHash().put(key,toUid,roomId); - redisTemplate.expire(key,7, TimeUnit.DAYS); + RMap map = redissonClient.getMap(key); + map.put(toUid+"",roomId+""); + map.expire(Duration.ofDays(7)); +// map.expire(7,TimeUnit.DAYS); +// redisTemplate.opsForHash().put(key,toUid,roomId); +// redisTemplate.expire(key,7, TimeUnit.DAYS); } public Map getAll(Long fromUid){ String key = getKey(fromUid); - return redisTemplate.opsForHash().entries(key); + RMap map = redissonClient.getMap(key); + return map.readAllMap(); +// return redisTemplate.opsForHash().entries(key); } public void del(Long fromUid) { String key = getKey(fromUid); - redisTemplate.delete(key); + redissonClient.getMap(key).delete(); } - private final static String DEL_ROOM_LUA = "return KEYS[1]\n" + + private final static String DEL_ROOM_LUA = "local r = tonumber(redis.call('hget',KEYS[1],ARGV[1]))\n" + "if r == tonumber(ARGV[2]) then\n" + " return redis.call('hdel',KEYS[1],ARGV[1])\n" + "end\n" + "return 0"; - public boolean delRoom(Long receiverId, Long roomId) { - DefaultRedisScript redisScript = new DefaultRedisScript<>(DEL_ROOM_LUA,Boolean.class); - Boolean execute = redisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), receiverId,roomId); + public boolean delRoom(Long callerId, Long receiverId, Long roomId) { + RScript script = redissonClient.getScript(); + Boolean execute = script.eval(RScript.Mode.READ_WRITE, DEL_ROOM_LUA, RScript.ReturnType.BOOLEAN, + Collections.singletonList(getKey(callerId)), receiverId+"", roomId+""); +// DefaultRedisScript redisScript = new DefaultRedisScript<>(DEL_ROOM_LUA,Boolean.class); +// Boolean execute = redisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), receiverId,roomId); return BooleanUtils.isTrue(execute); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java index 6d981914..b0080dc1 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java @@ -1,12 +1,10 @@ package com.ruoyi.cai.ws.cache; -import com.alibaba.fastjson2.JSON; import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.constant.RedisConstant; -import org.apache.commons.lang3.StringUtils; +import org.redisson.api.RBucket; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.SearchStrategy; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @@ -21,24 +19,21 @@ import java.util.concurrent.TimeUnit; @Component public class FdCtxDataCache { @Autowired - private StringRedisTemplate redisTemplate; + private RedissonClient redissonClient; public String getKey(String sessionKey){ return String.format(RedisConstant.FDCTX_DATA,sessionKey); } public void save(FdCtxData fdCtxData) { - String str = JSON.toJSONString(fdCtxData); - redisTemplate.opsForValue().set(getKey(fdCtxData.getSessionKey()),str,5, TimeUnit.DAYS); + RBucket bucket = redissonClient.getBucket(getKey(fdCtxData.getSessionKey())); + bucket.set(fdCtxData,5,TimeUnit.DAYS); } public FdCtxData getByRoomId(String sessionKey){ String key = getKey(sessionKey); - String s = redisTemplate.opsForValue().get(key); - if(StringUtils.isEmpty(s)){ - return null; - } - return JSON.parseObject(s,FdCtxData.class); + RBucket bucket = redissonClient.getBucket(key); + return bucket.get(); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java index 8ae34c8b..4ae9bece 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java @@ -1,6 +1,8 @@ package com.ruoyi.cai.ws.cache; import com.ruoyi.cai.ws.constant.RedisConstant; +import org.redisson.api.RSet; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @@ -12,20 +14,28 @@ import java.util.Set; public class OnlineDataCache { @Autowired private StringRedisTemplate redisTemplate; + @Autowired + private RedissonClient redissonClient; public String getKey(){ return RedisConstant.ONLINE_ROOM_DATA; } public Set getAll(){ - return redisTemplate.opsForSet().members(getKey()); + RSet set = redissonClient.getSet(getKey()); + return set.readAll(); +// return redisTemplate.opsForSet().members(getKey()); } public void add(Long roomId){ - redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId)); + RSet set = redissonClient.getSet(getKey()); + set.add(String.valueOf(roomId)); +// redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId)); } public void remove(Long roomId) { - redisTemplate.opsForSet().remove(getKey(),roomId); + RSet set = redissonClient.getSet(getKey()); + set.delete(); +// redisTemplate.opsForSet().remove(getKey(),roomId); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java index a77460c5..98f13d5e 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java @@ -3,13 +3,18 @@ package com.ruoyi.cai.ws.cache; import com.ruoyi.cai.ws.constant.RedisConstant; import com.ruoyi.cai.ws.constant.UserDataConstant; import com.ruoyi.cai.ws.holder.WebSocketSessionHolder; +import org.redisson.api.RMap; +import org.redisson.api.RSet; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -22,6 +27,8 @@ import java.util.concurrent.TimeUnit; public class RoomCtxCache { @Autowired private StringRedisTemplate redisTemplate; + @Autowired + private RedissonClient redissonClient; public String getKey(Long roomId){ return String.format(RedisConstant.FDCTX_ROOM_DATA,roomId); @@ -29,23 +36,28 @@ public class RoomCtxCache { public void addFd(String sessionKey,Long roomId,Integer userType){ String key = getKey(roomId); - redisTemplate.opsForHash().put(key,sessionKey,userType); - redisTemplate.expire(key,7, TimeUnit.DAYS); + RMap map = redissonClient.getMap(key); + map.put(sessionKey,userType); + map.expire(Duration.ofDays(7)); } public List getSessionKeysByRoomId(Long roomId){ String key = getKey(roomId); - Map entries = redisTemplate.opsForHash().entries(key); - List res = new ArrayList<>(); - for (Object o : entries.keySet()) { - res.add(String.valueOf(o)); + RMap entries = redissonClient.getMap(key); + Set set = entries.readAllKeySet(); + List keys = new ArrayList<>(); + for (String sessionKey : set) { + if(WebSocketSessionHolder.existSession(sessionKey)){ + keys.add(sessionKey); + } } - return res; + return keys; } public String getSessionKeyByRoomIdAndUserType(Long roomId,Integer userType){ String key = getKey(roomId); - Map entries = redisTemplate.opsForHash().entries(key); + RMap map = redissonClient.getMap(key); + Map entries = map.readAllMap(); for (Map.Entry entry : entries.entrySet()) { String sessionKey = String.valueOf(entry.getKey()); Integer userTypeK = Integer.valueOf(entry.getValue().toString()); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java index 94e852c7..da3c5a9c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java @@ -6,12 +6,16 @@ import com.ruoyi.cai.ws.bean.RoomData; import com.ruoyi.cai.ws.constant.RedisConstant; import com.ruoyi.cai.ws.constant.RoomStatusEnums; import org.apache.commons.lang3.BooleanUtils; +import org.redisson.api.RMap; +import org.redisson.api.RScript; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.Collections; import java.util.Map; @@ -26,6 +30,8 @@ public class RoomDataCache { @Autowired private RedissonClient redissonClient; @Autowired + private RedisTemplate redisTemplate; + @Autowired private StringRedisTemplate stringRedisTemplate; public String getKey(Long roomId){ @@ -33,82 +39,105 @@ public class RoomDataCache { } public RoomData getByRoomId(Long roomId){ - Map map = stringRedisTemplate.opsForHash().entries(getKey(roomId)); - if(map.get("roomId") == null){ + RMap map = redissonClient.getMap(getKey(roomId)); + Map allMap = map.readAllMap(); +// Map map = redisTemplate.opsForHash().entries(getKey(roomId)); + if(allMap.get("roomId") == null){ return null; } - return JSON.parseObject(JSON.toJSONString(map),RoomData.class); + return JSON.parseObject(JSON.toJSONString(allMap),RoomData.class); } public void init(RoomData roomData) { Map map = JSON.parseObject(JSON.toJSONString(roomData)); - stringRedisTemplate.opsForHash().putAll(getKey(roomData.getRoomId()),map); +// redisTemplate.opsForHash().putAll(getKey(roomData.getRoomId()),map); + RMap mapRedis = redissonClient.getMap(getKey(roomData.getRoomId())); + mapRedis.putAll(map); + mapRedis.expire(Duration.ofDays(15)); } public void hMSet(Long roomId,Map data) { String key = getKey(roomId); - stringRedisTemplate.opsForHash().putAll(key,data); +// redisTemplate.opsForHash().putAll(key,data); + RMap map = redissonClient.getMap(key); + map.putAll(data); } public void hMSet(Long roomId,String mKey,Object mData) { String key = getKey(roomId); - stringRedisTemplate.opsForHash().put(key,mKey,mData); + RMap map = redissonClient.getMap(key); + map.put(mKey,mData); +// redisTemplate.opsForHash().put(key,mKey,mData); } - private final static String HANG_UP = "local hangupTime = tonumber(redis.call('hGet', KEYS[1], 'hangupTime'))\n" + - "if hangupTime > 0 then\n" + + private final static String HANG_UP = "local hangupTime = tonumber(redis.call('hGet', KEYS[1], '\"hangupTime\"'))\n" + + "if hangupTime and hangupTime > 0 then\n" + " return 0\n" + "end\n" + - "return redis.call('hMSet', KEYS[1], 'status', 8, 'hangupTime', ARGV[1])"; + "return redis.call('hMSet', KEYS[1], '\"status\"', 8, '\"hangupTime\"', ARGV[1])"; public boolean hangUp(Long roomId) { - DefaultRedisScript redisScript = new DefaultRedisScript<>(HANG_UP,Boolean.class); - String currentTime = String.valueOf(System.currentTimeMillis() / 1000); - Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime); + RScript script = redissonClient.getScript(); + Long currentTime = System.currentTimeMillis() / 1000; + Boolean execute = script.eval(RScript.Mode.READ_WRITE, HANG_UP, RScript.ReturnType.BOOLEAN, + Collections.singletonList(getKey(roomId)), currentTime); +// DefaultRedisScript redisScript = new DefaultRedisScript<>(HANG_UP,Boolean.class); +// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime); return BooleanUtils.isTrue(execute); } public void setStatus(Long roomId, RoomStatusEnums status) { String key = getKey(roomId); - stringRedisTemplate.opsForHash().put(key,"status",status.getCode()); +// redisTemplate.opsForHash().put(key,"status",status.getCode()); + RMap map = redissonClient.getMap(key); + map.put("status",status.getCode()); } - private final static String SET_STATUS_RECEIVER_CONNECTION = "local status = tonumber(redis.call('hget', KEYS[1], 'status'))\n" + + private final static String SET_STATUS_RECEIVER_CONNECTION = "local status = tonumber(redis.call('hget', KEYS[1], '\"status\"'))\n" + "if status ~= 1 then\n" + " return 0\n" + "end\n" + - "return redis.call('hmset', KEYS[1], 'status', 3)"; + "return redis.call('hmset', KEYS[1], '\"status\"', 3)"; public boolean setStatusReceiverConnection(Long roomId) { - DefaultRedisScript redisScript = new DefaultRedisScript<>(SET_STATUS_RECEIVER_CONNECTION,Boolean.class); - Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId))); + RScript script = redissonClient.getScript(); + Boolean execute = script.eval(RScript.Mode.READ_WRITE, SET_STATUS_RECEIVER_CONNECTION, RScript.ReturnType.BOOLEAN, + Collections.singletonList(getKey(roomId))); +// DefaultRedisScript redisScript = new DefaultRedisScript<>(SET_STATUS_RECEIVER_CONNECTION,Boolean.class); +// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId))); return BooleanUtils.isTrue(execute); } private final static String SET_STATUS_AGREE = - "local status = tonumber(redis.call('hget', KEYS[1], 'status'))\n" + - "local beginTime = tonumber(redis.call('hget', KEYS[1], 'beginTime'))\n" + - "if status ~= 3 or beginTime > 0 then\n" + + "local status = tonumber(redis.call('hget', KEYS[1], '\"status\"'))\n" + + "local beginTime = tonumber(redis.call('hget', KEYS[1], '\"beginTime\"'))\n" + + "if status ~= 3 or (beginTime and beginTime > 0) then\n" + " return 0\n" + "end\n" + - "return redis.call('hmset', KEYS[1], 'status', 7, 'beginTime', ARGV[1])"; + "return redis.call('hmset', KEYS[1], '\"status\"', 7, '\"beginTime\"', ARGV[1])"; public boolean setStatusAgree(Long roomId) { - DefaultRedisScript redisScript = new DefaultRedisScript<>(SET_STATUS_AGREE,Boolean.class); String currentTime = String.valueOf(System.currentTimeMillis() / 1000); - Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime); + RScript script = redissonClient.getScript(); + Boolean execute = script.eval(RScript.Mode.READ_WRITE, SET_STATUS_AGREE, RScript.ReturnType.BOOLEAN, + Collections.singletonList(getKey(roomId)),currentTime); +// DefaultRedisScript redisScript = new DefaultRedisScript<>(SET_STATUS_AGREE,Boolean.class); +// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime); return BooleanUtils.isTrue(execute); } private final static String INCS_BLACK_AMOUNT = - "redis.call('hIncrBy', KEYS[1], 'payCoin', ARGV[1])\n" + - "redis.call('hIncrBy', KEYS[1], 'payIncome', ARGV[2])"; + "redis.call('hIncrBy', KEYS[1], '\"payCoin\"', ARGV[1])\n" + + "redis.call('hIncrBy', KEYS[1], '\"payIncome\"', ARGV[2])"; public boolean incsBlackAmount(Long roomId, Long decrCoin, Long decrIncomeCoin) { - DefaultRedisScript redisScript = new DefaultRedisScript<>(INCS_BLACK_AMOUNT,Boolean.class); - Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), - decrCoin,decrIncomeCoin); + RScript script = redissonClient.getScript(); + Boolean execute = script.eval(RScript.Mode.READ_WRITE, INCS_BLACK_AMOUNT, RScript.ReturnType.BOOLEAN, + Collections.singletonList(getKey(roomId)),decrCoin.intValue(),decrIncomeCoin.intValue()); +// DefaultRedisScript redisScript = new DefaultRedisScript<>(INCS_BLACK_AMOUNT,Boolean.class); +// Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), +// decrCoin,decrIncomeCoin); return BooleanUtils.isTrue(execute); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/UserDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/UserDataCache.java index cc8da757..d5070344 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/UserDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/UserDataCache.java @@ -4,10 +4,13 @@ import com.alibaba.fastjson2.JSON; import com.ruoyi.cai.ws.bean.UserData; import com.ruoyi.cai.ws.constant.RedisConstant; import com.ruoyi.cai.ws.constant.UserDataConstant; +import org.redisson.api.RMap; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.Map; /** @@ -19,6 +22,8 @@ import java.util.Map; public class UserDataCache { @Autowired private StringRedisTemplate redisTemplate; + @Autowired + private RedissonClient redissonClient; public String getKey(Long roomId,int type){ return String.format(RedisConstant.USER_ROOM_DATA,roomId,type== UserDataConstant.TYPE_CALLER?"caller":"receiver"); @@ -34,18 +39,23 @@ public class UserDataCache { public UserData getUserDataByRoom(Long roomId,int type){ String key = getKey(roomId, type); - Map entries = redisTemplate.opsForHash().entries(key); - if(entries.get("roomId") == null){ +// Map entries = redisTemplate.opsForHash().entries(key); + RMap entries = redissonClient.getMap(key); + Map map = entries.readAllMap(); + if(map.get("roomId") == null){ return null; } - return JSON.parseObject(JSON.toJSONString(entries),UserData.class); + return JSON.parseObject(JSON.toJSONString(map),UserData.class); } public void init(UserData userData,int type){ String key = getKey(userData.getRoomId(), type); userData.setUserType(type); Map map = JSON.parseObject(JSON.toJSONString(userData)); - redisTemplate.opsForHash().putAll(key,map); +// redisTemplate.opsForHash().putAll(key,map); + RMap mapRedis = redissonClient.getMap(key); + mapRedis.putAll(map); + mapRedis.expire(Duration.ofDays(15)); } public void initCaller(UserData callerUserData) { @@ -58,6 +68,7 @@ public class UserDataCache { public void hMSet(Long roomId,Integer userType,Map data) { String key = getKey(roomId, userType); - redisTemplate.opsForHash().putAll(key,data); +// redisTemplate.opsForHash().putAll(key,data); + redissonClient.getMap(key).putAll(data); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/dto/WsRMsgGen.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/dto/WsRMsgGen.java index 808f2502..65d739fd 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/dto/WsRMsgGen.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/dto/WsRMsgGen.java @@ -53,8 +53,8 @@ public class WsRMsgGen { public static WsR hangup(String message, Long roomId, Integer hangUpType) { Map map = new HashMap<>(); - map.put("roomid","roomId"); - map.put("type","hangUpType"); + map.put("roomid",roomId); + map.put("type",hangUpType); WsR> ok = WsR.ok(map); ok.setMethod("hangup"); ok.setMsg(message); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java index bd743772..acf4a9ec 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java @@ -10,8 +10,8 @@ import com.ruoyi.cai.ws.constant.HangUpEnums; import com.ruoyi.cai.ws.dto.WsToken; import com.ruoyi.cai.ws.service.CheckConnectionDTO; import com.ruoyi.cai.ws.service.RoomService; -import com.ruoyi.cai.ws.util.MapGetUtil; import com.ruoyi.cai.ws.util.WsExceptionUtil; +import jodd.util.StringUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; @@ -36,6 +36,9 @@ public class MessageHandleApplication { public void processOn(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); + if(StringUtil.isEmpty(payload)){ + return; + } JSONObject jsonObject = JSON.parseObject(payload); Object method = jsonObject.get("method"); if(method == null){ diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java index 296d2c3a..0f9528f6 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java @@ -1,11 +1,17 @@ package com.ruoyi.cai.ws.handler; +import cn.hutool.core.util.URLUtil; +import com.google.common.base.Splitter; import com.ruoyi.cai.chat.ChatManager; +import com.ruoyi.cai.ws.cache.FdCtxDataCache; import com.ruoyi.cai.ws.constant.WebSocketConstants; import com.ruoyi.cai.ws.holder.WebSocketSessionHolder; import com.ruoyi.cai.ws.processon.OpenLogic; +import com.ruoyi.cai.ws.service.RoomService; import com.ruoyi.cai.ws.util.WebSocketUtils; +import com.ruoyi.common.utils.StringUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.http.client.utils.URIUtils; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -14,6 +20,9 @@ import org.springframework.web.socket.handler.AbstractWebSocketHandler; import javax.websocket.server.PathParam; import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -34,10 +43,27 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { */ @Override public void afterConnectionEstablished(WebSocketSession session) { + URI uri = session.getUri(); + Map para = getPara(uri == null ? null : uri.getQuery()); + Map attributes = session.getAttributes(); + attributes.putAll(para); openLogic.processOn(session); log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId()); } + private Map getPara(String uri) { + Map map = new HashMap<>(); + if(StringUtils.isEmpty(uri)){ + return map; + } + String[] keys = uri.split("&"); + for (String key : keys) { + String[] split = key.split("="); + map.put(split[0],split.length > 1 ? split[1] : ""); + } + return map; + } + /** * 处理发送来的文本消息 * @@ -47,7 +73,11 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - messageHandleApplication.processOn(session,message); + try { + messageHandleApplication.processOn(session,message); + }catch (Exception e){ + log.error("ws消息处理失败!需要开发检查问题!",e); + } } @Override @@ -79,6 +109,10 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); } + @Autowired + private RoomService roomService; + @Autowired + private FdCtxDataCache fdCtxDataCache; /** * 连接关闭后 * @@ -88,6 +122,7 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { String token = String.valueOf(session.getAttributes().get("token")); + // TODO 连接关闭 是否要删除fd的关系 WebSocketSessionHolder.removeSession(token); log.info("[disconnect] sessionId: {},token:{}", session.getId(), token); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java index 04fe8250..1178a8d6 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java @@ -37,7 +37,7 @@ public class HangupMessageHandler extends AbstractMessageHandle implements IMess return; } // 触发结算 - settleService.processOn(room); + settleService.processOn(room.getRoomId()); Integer type = fdCtxData.isCaller() ? HangUpEnums.FROM.getCode() : HangUpEnums.TO.getCode(); sendToCurrent(fdCtxData,WsRMsgGen.hangup("您已挂断",room.getRoomId(), type)); sendToTar(fdCtxData,WsRMsgGen.hangup("对方已挂断",room.getRoomId(), type)); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java index 8dea5c52..fc9859e0 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java @@ -9,18 +9,18 @@ import com.ruoyi.cai.trd.ImMsgGen; import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.UserData; import com.ruoyi.cai.ws.cache.OnlineDataCache; -import com.ruoyi.cai.ws.cache.RoomCtxCache; import com.ruoyi.cai.ws.constant.RoomStatusEnums; import com.ruoyi.cai.ws.service.RoomService; +import com.ruoyi.cai.ws.service.SettleService; import com.ruoyi.yunxin.Yunxin; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Set; -import java.util.concurrent.TimeUnit; @Component +@Slf4j public class CheckTimeOutJob { @Autowired private OnlineDataCache onlineDataCache; @@ -29,13 +29,14 @@ public class CheckTimeOutJob { @Autowired private UserCallService userCallService; @Autowired - private RoomCtxCache roomCtxCache; + private SettleService settleService; @Autowired private Yunxin yunxin; + /** - * 1 分钟执行一次 + * 检查房间是不是三分钟没有接听, 需要自动挂断掉 */ - @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) +// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) public void run(){ Set roomIdStr = onlineDataCache.getAll(); for (String roomIdS : roomIdStr) { @@ -45,7 +46,11 @@ public class CheckTimeOutJob { } public void deal(Long roomId){ + log.info("开始执行房间超时检测,是否一直不接 roomId={}",roomId); Room room = roomService.load(roomId); + if(room == null){ + return; + } if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus()) && !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){ return; @@ -66,6 +71,7 @@ public class CheckTimeOutJob { userCallService.update(Wrappers.lambdaUpdate(UserCall.class) .eq(UserCall::getId,roomId) .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); + // TODO 需要放开主播的接听状态 roomService.closeAllFd(roomId); ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0); yunxin.sendToSync(receiverUserData.getId(),callUserData.getId(),imDataRes); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java index 73e362fb..756bfe51 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java @@ -41,7 +41,7 @@ public class HeartbeatJob { /** - * 30秒执行一次 + * 30秒执行一次 心跳只处理接听后的心跳 */ @Scheduled(fixedDelay = 30,timeUnit = TimeUnit.SECONDS) public void run(){ @@ -50,6 +50,9 @@ public class HeartbeatJob { try { Long roomId = Long.valueOf(roomIdS); Room room = roomService.load(roomId); + if(room == null){ + return; + } if(!room.isCanCall()){ return; } @@ -69,7 +72,7 @@ public class HeartbeatJob { userCallService.update(Wrappers.lambdaUpdate(UserCall.class) .eq(UserCall::getId,roomId) .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); - settleService.processOn(room); + settleService.processOn(roomId); } List keys = roomCtxCache.getSessionKeysByRoomId(roomId); RoomWebSocketUtil.sendSendMessage(keys, hangup); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java index 60af87dc..69f0a78a 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java @@ -25,17 +25,13 @@ public class SettleJob { /** * 每 1 分钟执行一次 */ - @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) +// @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) public void run(){ Set all = onlineDataCache.getAll(); for (String roomIdS : all) { try { Long roomId = Long.valueOf(roomIdS); - Room room = roomService.load(roomId); - if(room == null){ - return; - } - settleService.processOn(room); + settleService.processOn(roomId); }catch (Exception e){ log.info("定时任务结算失败!",e); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java index 194e08da..77912c82 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java @@ -48,7 +48,7 @@ public class WebSocketManager { return null; } Room room = roomService.load(roomId); - if(room == null){ + if(room == null || !room.isCanCall()){ return null; } return room; @@ -104,7 +104,6 @@ public class WebSocketManager { RoomData roomData = new RoomData(); roomData.setRoomId(call.getId()); roomData.setCallPrice(call.getCallPrice()); - roomData.setSkillName(call.getSkillName()); roomData.setStatus(call.getStatus()); roomData.setVideoDivide(call.getReceiverVideoDivide()); roomDataCache.init(roomData); @@ -119,7 +118,7 @@ public class WebSocketManager { receiveUserData.setRoomId(call.getId()); receiveUserData.setNickname(receiverUser.getNickname()); receiveUserData.setUserCode(receiverUser.getUsercode()); - userDataCache.initReceiver(callerUserData); + userDataCache.initReceiver(receiveUserData); callerRoomCache.addRoom(call.getFromUid(),call.getToUid(),call.getId()); return call.getId(); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java index be803827..00d7f409 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java @@ -73,7 +73,7 @@ public class OpenLogic { Long roomId = wsToken.getRoomId(); Long userId = wsToken.getUserId(); Room room = roomService.load(roomId); - if(room == null || (room.getCallUserData().getId().equals(userId) && room.getReceiverUserData().getId().equals(userId))){ + if(room == null || (!room.getCallUserData().getId().equals(userId) && !room.getReceiverUserData().getId().equals(userId))){ WsExceptionUtil.throwException(session,"房间不可用", HangUpEnums.OTHER,roomId); return; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java index cd62da8a..697fb16b 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java @@ -138,8 +138,8 @@ public class RoomService { if(beginTime == null){ return 0L; } - if(roomData.getHangUpTime() != null){ - return roomData.getHangUpTime() - roomData.getBeginTime(); + if(roomData.getHangupTime() != null){ + return roomData.getHangupTime() - roomData.getBeginTime(); } return DateUtil.currentSeconds() - beginTime; @@ -180,7 +180,7 @@ public class RoomService { Long receiverId = receiverUserData.getId(); Long callerId = room.getCallUserData().getId(); // 呼叫方释放资源 - callerRoomCache.delRoom(receiverId,roomId); + callerRoomCache.delRoom(callerId,receiverId,roomId); userService.updateVideoStatus(callerId,0); // 接收方释放资源(已连接的情况下) if(receiverUserData.getConnectTime() != null && receiverUserData.getConnectTime() > 0){ @@ -195,11 +195,25 @@ public class RoomService { } // 修改释放状态 Map map = new HashMap<>(); - map.put("releaseRes",1); + map.put("releaseRes",true); roomDataCache.hMSet(roomId,map); }catch (Exception e){ log.error("释放房间资源失败!房间号:{}",roomId,e); } } + + /** + * 检查是否正在通话中 + */ + public boolean checkRunningVideo(Long toUserId, Long fromUserId) { + Long roomId = callerRoomCache.getRoomId(toUserId, fromUserId); + if(roomId != null){ + Room room = load(roomId); + if(room != null && room.isCanCall()){ + return true; + } + } + return false; + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java index 17c03f53..d66143b0 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java @@ -76,7 +76,7 @@ public class SettleService { boolean b = roomService.hangUp(room.getRoomId()); if(b){ // 结算操作 - settleService.processOn(room); + settleService.processOn(room.getRoomId()); } // 向客户端发送挂断指令 WsR r = WsRMsgGen.hangup("拨打方余额不足", room.getRoomId(), HangUpEnums.NOTMONEY.getCode()); @@ -99,10 +99,14 @@ public class SettleService { /** * 结算处理 - * @param room */ @SneakyThrows - public void processOn(Room room){ + public void processOn(Long roomId){ + Room room = roomService.load(roomId); + if(room == null){ + log.warn("房间不存在,无需结算 roomId={}",roomId); + return; + } String lock = LockManager.getVideoSettleLock(room.getRoomId()); RLock clientLock = redissonClient.getLock(lock); boolean locked = clientLock.isLocked(); @@ -110,7 +114,7 @@ public class SettleService { log.info("正在结算中,稍等!"); return; } - boolean lockFlag = clientLock.tryLock(5, TimeUnit.SECONDS); + boolean lockFlag = clientLock.tryLock(); if(!lockFlag){ log.info("正在结算中,稍等!"); return; @@ -133,7 +137,6 @@ public class SettleService { } if(!room.isReleaseRes()){ // 房间资源是否已经释放 roomService.releaseRes(room.getRoomId()); - return; } Long roomId = room.getRoomId(); // 未通话,无需结算