From 28623f17d18a2baa5be89fc33d51ea6ca229d098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E8=89=AF=28004796=29?= Date: Fri, 12 Jan 2024 17:50:50 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9B=B4=E6=92=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ruoyi/cai/dto/video/VideoSettleResp.java | 12 ++ .../cai/dto/video/WithholdingFeeUserResp.java | 9 + .../com/ruoyi/cai/manager/LockManager.java | 5 + .../java/com/ruoyi/cai/mq/AmqpProducer.java | 15 ++ .../java/com/ruoyi/cai/mq/CommonDelayDto.java | 9 + .../com/ruoyi/cai/mq/CommonDelayMqConfig.java | 40 ++++ .../cai/mq/{ => consumer}/AmqpConsumer.java | 18 +- .../mq/consumer/CheckTimeOutMqConsumer.java | 30 +++ .../mq/consumer/CommonDelayMqConsumer.java | 34 ++++ .../com/ruoyi/cai/service/AccountService.java | 8 +- .../cai/service/impl/AccountServiceImpl.java | 157 ++++++++++++++- .../java/com/ruoyi/cai/trd/ImDataRes.java | 1 + .../main/java/com/ruoyi/cai/trd/ImMsgGen.java | 3 +- .../java/com/ruoyi/cai/ws/bean/RoomData.java | 11 +- .../ruoyi/cai/ws/cache/OnlineDataCache.java | 7 + .../com/ruoyi/cai/ws/cache/RoomDataCache.java | 16 +- .../ws/handler/MessageHandleApplication.java | 18 +- .../cai/ws/handler/RoomWebSocketHandler.java | 6 +- .../handler/message/AgreeMessageHandle.java | 4 + .../handler/message/CancelMessageHandler.java | 5 + .../handler/message/HangupMessageHandler.java | 12 +- .../handler/message/RefuseMessageHandler.java | 4 +- .../com/ruoyi/cai/ws/job/CheckTimeOutJob.java | 75 +++++++ .../com/ruoyi/cai/ws/job/HeartbeatJob.java | 99 +++++++++ .../java/com/ruoyi/cai/ws/job/SettleJob.java | 44 ++++ .../com/ruoyi/cai/ws/processon/OpenLogic.java | 10 +- .../com/ruoyi/cai/ws/service/RoomService.java | 15 ++ .../ruoyi/cai/ws/service/SettleService.java | 189 +++++++++++------- 28 files changed, 746 insertions(+), 110 deletions(-) create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/VideoSettleResp.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/WithholdingFeeUserResp.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayDto.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java rename ruoyi-cai/src/main/java/com/ruoyi/cai/mq/{ => consumer}/AmqpConsumer.java (75%) create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/VideoSettleResp.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/VideoSettleResp.java new file mode 100644 index 00000000..aa65205d --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/VideoSettleResp.java @@ -0,0 +1,12 @@ +package com.ruoyi.cai.dto.video; + +import com.ruoyi.cai.domain.ConsumeLog; +import lombok.Data; + +@Data +public class VideoSettleResp { + private ConsumeLog consumeLog; + private Long payCoin; + private Long payIncome; + private Long anchorIncome; +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/WithholdingFeeUserResp.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/WithholdingFeeUserResp.java new file mode 100644 index 00000000..ec912c75 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/video/WithholdingFeeUserResp.java @@ -0,0 +1,9 @@ +package com.ruoyi.cai.dto.video; + +import lombok.Data; + +@Data +public class WithholdingFeeUserResp { + private Long decrCoin; + private Long decrIncomeCoin; +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/LockManager.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/LockManager.java index 232c8f1d..ea83520a 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/LockManager.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/LockManager.java @@ -10,6 +10,7 @@ public class LockManager { public static final String LOCK_SEND_GREET_REDIS = RedisConstant.REDIS_P + "lock:sendGreet:%s"; public static final String LOCK_SEND_GUARD_REDIS = RedisConstant.REDIS_P + "lock:sendGuard:%s"; public static final String LOCK_SEND_GIFT_REDIS = RedisConstant.REDIS_P + "lock:sendGift:%s"; + public static final String LOCK_VIDEO_SETTLE_REDIS = RedisConstant.REDIS_P + "lock:videoSettle:%s"; public static String getRegisterLockKey(String mobile){ return String.format(LOCK_REGISTER_REDIS,mobile); @@ -31,6 +32,10 @@ public class LockManager { return String.format(LOCK_SEND_GUARD_REDIS,userId); } + public static String getVideoSettleLock(Long roomId){ + return String.format(LOCK_VIDEO_SETTLE_REDIS,roomId); + } + public static String getSendGiftLock(Long userId){ return String.format(LOCK_SEND_GIFT_REDIS,userId); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java index 22bf8fbe..69f0d84e 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpProducer.java @@ -1,5 +1,7 @@ package com.ruoyi.cai.mq; +import com.alibaba.fastjson.JSON; +import com.ruoyi.cai.mq.consumer.AmqpConsumer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -22,4 +24,17 @@ public class AmqpProducer { return messagePostProcessor; }); } + + public void sendCommonDelayMq(Integer type,Long roomId,Integer timeout){ + CommonDelayDto dto = new CommonDelayDto(); + dto.setType(type); + dto.setRoomId(roomId); + rabbitTemplate.convertAndSend(CheckTimeOutMqConfig.EXCHANGE_NAME, + CheckTimeOutMqConfig.ROUTING_KEY, + JSON.toJSONString(dto), + messagePostProcessor -> { + messagePostProcessor.getMessageProperties().setDelay(timeout*1000); // 设置延迟时间,单位毫秒 + return messagePostProcessor; + }); + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayDto.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayDto.java new file mode 100644 index 00000000..c7061b4d --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayDto.java @@ -0,0 +1,9 @@ +package com.ruoyi.cai.mq; + +import lombok.Data; + +@Data +public class CommonDelayDto { + private Integer type; + private Long roomId; +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java new file mode 100644 index 00000000..11b9205b --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayMqConfig.java @@ -0,0 +1,40 @@ +package com.ruoyi.cai.mq; + +import org.springframework.amqp.core.*; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; + + +@Configuration +public class CommonDelayMqConfig { + + public static final String EXCHANGE_NAME = "commonDelayExchange"; + public static final String QUEUE_NAME = "commonDelayQueue"; + public static final String ROUTING_KEY = "commonDelayRouting"; + + @Bean + public CustomExchange delayedExchange() { + HashMap args = new HashMap<>(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(EXCHANGE_NAME, + "x-delayed-message", // 消息类型 + true, // 是否持久化 + false,// 是否自动删除 + args); + } + + @Bean + public Queue delayedQueue() { + return QueueBuilder.durable(QUEUE_NAME) + .withArgument("x-delayed-type", "direct") + .build(); + } + + @Bean + public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { + return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); + } + +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/AmqpConsumer.java similarity index 75% rename from ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java rename to ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/AmqpConsumer.java index b6601946..07804304 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpConsumer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/AmqpConsumer.java @@ -1,10 +1,13 @@ -package com.ruoyi.cai.mq; +package com.ruoyi.cai.mq.consumer; +import com.ruoyi.cai.service.ConsumeLogService; +import com.ruoyi.cai.ws.service.SettleService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @@ -25,19 +28,18 @@ public class AmqpConsumer { System.out.println(msg); }*/ + @Autowired + private ConsumeLogService consumeLogService; + @RabbitListener(bindings = @QueueBinding( value = @Queue(value = CALCULATE_SALES_QUEUE, durable = "false", autoDelete = "false"), exchange = @Exchange(value = CALCULATE_SALES_EXCHANGE), key = CALCULATE_SALES_KEY) ,containerFactory = "customContainerFactory") public void calculateSalesQueue(String message) { - log.info("calculateSalesQueue: " + message); - } - - @RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME - ,containerFactory = "customContainerFactory") - public void checkTimeOutMq(String message) { - log.info("checkTimeOutMq: " + message); + log.info("接受到到分销处理请求: message=" + message); + consumeLogService.dealFenxiao(Long.valueOf(message)); + log.info("分销处理完成: message=" + message); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java new file mode 100644 index 00000000..4e501e81 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CheckTimeOutMqConsumer.java @@ -0,0 +1,30 @@ +package com.ruoyi.cai.mq.consumer; + +import com.ruoyi.cai.mq.AmqpProducer; +import com.ruoyi.cai.mq.CheckTimeOutMqConfig; +import com.ruoyi.cai.ws.service.SettleService; +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.index.qual.SameLen; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class CheckTimeOutMqConsumer { + @Autowired + private SettleService settleService; + @Autowired + private AmqpProducer amqpProducer; + + @RabbitListener(queues = CheckTimeOutMqConfig.QUEUE_NAME + ,containerFactory = "customContainerFactory") + public void checkTimeOutMq(String message) { + log.info("checkTimeOutMq: " + message); + boolean next = settleService.withholdingFee(Long.valueOf(message)); + if(next){ + // 1分钟后继续执行 + amqpProducer.sendCheckTimeOut(message,60); + } + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java new file mode 100644 index 00000000..fed64ff4 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java @@ -0,0 +1,34 @@ +package com.ruoyi.cai.mq.consumer; + +import com.alibaba.fastjson2.JSON; +import com.ruoyi.cai.mq.AmqpProducer; +import com.ruoyi.cai.mq.CommonDelayDto; +import com.ruoyi.cai.mq.CommonDelayMqConfig; +import com.ruoyi.cai.ws.job.CheckTimeOutJob; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class CommonDelayMqConsumer { + @Autowired + private CheckTimeOutJob checkTimeOutJob; + @Autowired + private AmqpProducer amqpProducer; + + @RabbitListener(queues = CommonDelayMqConfig.QUEUE_NAME + ,containerFactory = "customContainerFactory") + public void checkTimeOutMq(String message) { + log.info("CommonDelayMqConsumer: " + message); + CommonDelayDto dto = JSON.parseObject(message, CommonDelayDto.class); + switch (dto.getType()){ + case 1: + checkTimeOutJob.deal(dto.getRoomId()); + break; + default: + break; + } + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/AccountService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/AccountService.java index bdba48b7..ef7b3aa6 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/AccountService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/AccountService.java @@ -6,10 +6,12 @@ import com.ruoyi.cai.domain.Account; import com.ruoyi.cai.domain.ConsumeLog; import com.ruoyi.cai.domain.User; import com.ruoyi.cai.dto.admin.vo.AccountAdminVo; +import com.ruoyi.cai.dto.video.VideoSettleResp; +import com.ruoyi.cai.dto.video.WithholdingFeeUserResp; import com.ruoyi.cai.enums.AccountBusinessEnum; import com.ruoyi.cai.enums.AccountChangeEnum; +import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.common.core.domain.PageQuery; -import org.springframework.transaction.annotation.Transactional; /** * 用户账户Service接口 @@ -32,4 +34,8 @@ public interface AccountService extends IService { Page pageAdmin(PageQuery pageQuery, AccountAdminVo bo); void distribution(Long userId, Long amount, AccountChangeEnum oneEnum,Long traceId); + + WithholdingFeeUserResp withholdingFeeUser(Long userId, Long price); + + VideoSettleResp videoSettle(Room room); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java index 6c70756d..e65aeea2 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/AccountServiceImpl.java @@ -1,13 +1,17 @@ package com.ruoyi.cai.service.impl; +import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.ruoyi.cai.domain.*; import com.ruoyi.cai.dto.admin.vo.AccountAdminVo; +import com.ruoyi.cai.dto.video.VideoSettleResp; +import com.ruoyi.cai.dto.video.WithholdingFeeUserResp; import com.ruoyi.cai.enums.AccountBusinessEnum; import com.ruoyi.cai.enums.AccountChangeEnum; +import com.ruoyi.cai.enums.ConsumeLogType; import com.ruoyi.cai.enums.SystemConfigEnum; import com.ruoyi.cai.manager.IdManager; import com.ruoyi.cai.manager.SystemConfigManager; @@ -15,15 +19,19 @@ import com.ruoyi.cai.mapper.AccountMapper; import com.ruoyi.cai.pay.RechargeTypeEnum; import com.ruoyi.cai.service.*; import com.ruoyi.cai.util.NumCaUtil; +import com.ruoyi.cai.ws.bean.Room; +import com.ruoyi.cai.ws.bean.RoomData; +import com.ruoyi.cai.ws.bean.UserData; +import com.ruoyi.cai.ws.cache.RoomDataCache; +import com.ruoyi.cai.ws.constant.RoomStatusEnums; +import com.ruoyi.cai.ws.service.RoomService; import com.ruoyi.common.core.domain.PageQuery; import com.ruoyi.common.exception.ServiceException; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import javax.xml.transform.Source; import java.math.BigDecimal; /** @@ -127,7 +135,7 @@ public class AccountServiceImpl extends ServiceImpl impl coin = -account.getCoin(); incomeCoin = diff; boolean decrCoin = baseMapper.decrCoin(fromUserId, -coin); - boolean decrIncomeCoin = baseMapper.decrIncomeCoin(fromUserId, -diff); + boolean decrIncomeCoin = baseMapper.decrIncomeCoin(fromUserId, -incomeCoin); if(decrCoin && decrIncomeCoin){ flag = true; } @@ -216,4 +224,147 @@ public class AccountServiceImpl extends ServiceImpl impl accountChangeLogService.saveLogNoAdmin(userId,user.getUsercode(), RechargeTypeEnum.COIN_INCOME,accountChangeEnum,amount,traceId); } + /** + * 视频-预付款 + * @param userId + * @param price + */ + @Override + @Transactional(rollbackFor = Exception.class) + public WithholdingFeeUserResp withholdingFeeUser(Long userId, Long price) { + Account account = this.getByUserId(userId); + if(account == null){ + throw new ServiceException("账户异常"); + } + long totalCoin = account.getIncomeCoin() + account.getCoin(); + if(totalCoin < price){ + throw new ServiceException("余额不足"); + } + boolean flag = false; + long coin = -price; // 消费余额 + long incomeCoin = 0; // 消费收益 + long diff = account.getCoin() - price; + // 充值币够用 + if(diff > 0){ + flag = baseMapper.decrCoin(userId, -coin); + } else { // 充值币不够用 + coin = -account.getCoin(); + incomeCoin = diff; + boolean decrCoin = baseMapper.decrCoin(userId, -coin); + boolean decrIncomeCoin = baseMapper.decrIncomeCoin(userId, -incomeCoin); + if(decrCoin && decrIncomeCoin){ + flag = true; + } + } + // 扣费不成功 + if(!flag){ + throw new ServiceException("扣费失败"); + } + WithholdingFeeUserResp resp = new WithholdingFeeUserResp(); + resp.setDecrCoin(-coin); + resp.setDecrIncomeCoin(-incomeCoin); + return resp; + } + + @Autowired + private RoomService roomService; + @Autowired + private UserCallService userCallService; + + @Override + public VideoSettleResp videoSettle(Room room) { + RoomData roomData = room.getRoomData(); + UserCall userCall = userCallService.getById(roomData.getRoomId()); + UserData callUserData = room.getCallUserData(); + Long callPrice = roomData.getCallPrice(); + Long callTime = roomService.getCallTime(room); + // 本次支付金额 + long totalAmount = callPrice * (callTime / 60); // 本次需要支付的金额 + Long payCoin = roomData.getPayCoin(); // 已经支付的余额 + Long payIncome = roomData.getPayIncome(); // 已经支付的收益 + // 补差价 + long diff = totalAmount - (payCoin + payIncome); + Account callAccount = this.getByUserId(callUserData.getId()); + long userAmount = callAccount.getCoin() + callAccount.getIncomeCoin(); // 用户余额 + if(diff > 0){ + // 账户上面有余额 + if(userAmount > 0){ + diff = Math.min(userAmount, diff); + log.info("roomid {} 已支付 {} 仍需要补差价:{}, 用户余额:{} ", + roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff, userAmount); + WithholdingFeeUserResp resp = withholdingFeeUser(callUserData.getId(), diff); + payCoin = payCoin + resp.getDecrCoin(); + payIncome = payIncome + resp.getDecrIncomeCoin(); + } else { + log.info("房间需要补差价,但用户余额不足 roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff); + } + }else{ + // 退钱逻辑 + long incsCoin = 0L; // 需要退的金额 + long incsIncomeCoin = 0L; // 需要退的收益金额 + diff = Math.abs(diff); // 待退款的钱 + if(roomData.getPayCoin() > diff){ // 消费的金额大于待退款金额 + incsCoin = diff; + }else { + if(roomData.getPayIncome() > (diff - roomData.getPayCoin())){ // 消费金额不够退,但是收益金额扣扣费 + incsCoin = roomData.getPayCoin(); + incsIncomeCoin = roomData.getPayIncome() - (diff - roomData.getPayCoin()); + } else { // 消费金额不够退 余额也不够退 能退多少是多少 + incsCoin = roomData.getPayCoin(); + incsIncomeCoin = roomData.getPayIncome(); + } + } + if(incsCoin > 0){ + payCoin = payCoin - incsCoin; + baseMapper.incsCoin(callUserData.getId(), incsCoin); + } + if(incsIncomeCoin > 0){ + payIncome = payIncome - incsIncomeCoin; + baseMapper.incsCoin(callUserData.getId(), incsIncomeCoin); + } + } + Long amountReal = payCoin + payIncome; // 实际支付的金额 + Long anchorAmount = NumCaUtil.coin(amountReal,roomData.getVideoDivide()); + User fromUser = userService.getById(userCall.getFromUid()); + User toUser = userService.getById(userCall.getToUid()); + + baseMapper.incsIncomeCoin(toUser.getId(),anchorAmount); + Long tractId = IdManager.nextId(); + // 记录消费方的流水 + if(payCoin != 0){ + accountChangeLogService.saveLogNoAdmin(fromUser.getId(),fromUser.getUsercode(), RechargeTypeEnum.COIN, AccountChangeEnum.USER_VIDEO, payCoin, tractId); + } + if(payIncome != 0){ + accountChangeLogService.saveLogNoAdmin(fromUser.getId(),fromUser.getUsercode(), RechargeTypeEnum.COIN_INCOME,AccountChangeEnum.USER_VIDEO,payIncome,tractId); + } + // 记录接收方的流水 + if(anchorAmount != 0){ + accountChangeLogService.saveLogNoAdmin(toUser.getId(),toUser.getUsercode(), RechargeTypeEnum.COIN_INCOME,AccountChangeEnum.ANCHOR_VIDEO,anchorAmount,tractId); + } + ConsumeLog consumeLog = new ConsumeLog(); + consumeLog.init(fromUser,toUser); + consumeLog.setType(ConsumeLogType.VIDEO.getCode()); + consumeLog.setBusinessEnum(AccountBusinessEnum.VIDEO.name()); + consumeLog.setTraceId(tractId); + consumeLog.setTargetRate(roomData.getVideoDivide()); + consumeLog.setAmount(amountReal); + consumeLog.setAnchorAmount(anchorAmount); + // 获取分销的比例和用户 + consumeLogService.calculateInitFenxiao(consumeLog); + consumeLogService.save(consumeLog); + userCallService.update(Wrappers.lambdaUpdate(UserCall.class) + .eq(UserCall::getId, userCall.getId()) + .set(UserCall::getBeginTime, DateUtil.date(roomData.getBeginTime()).toLocalDateTime()) + .set(UserCall::getEndTime,DateUtil.date(roomData.getHangUpTime()).toLocalDateTime()) + .set(UserCall::getCallTime,roomData.getHangUpTime() - roomData.getBeginTime()) + .set(UserCall::getCallAmount,amountReal) + .set(UserCall::getCallIncome, anchorAmount) + .set(UserCall::getStatus, RoomStatusEnums.STATUS_HANGUP.getCode())); + VideoSettleResp resp = new VideoSettleResp(); + resp.setConsumeLog(consumeLog); + resp.setPayIncome(payIncome); + resp.setPayCoin(payCoin); + return resp; + } + } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImDataRes.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImDataRes.java index 15b82a74..11d60f85 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImDataRes.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImDataRes.java @@ -9,6 +9,7 @@ public class ImDataRes { @Data public static class ImData { + // 通话状态,1 取消 2 拒绝 3 超时 4 已通话 private Integer callType; private Integer status; private Long fromUid; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImMsgGen.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImMsgGen.java index 39b1beb9..5822c0d8 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImMsgGen.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/ImMsgGen.java @@ -13,7 +13,7 @@ public class ImMsgGen { ImDataRes imDataRes = new ImDataRes(); imDataRes.setType(15); ImDataRes.ImData imData = new ImDataRes.ImData(); - imData.setCallType(1); + imData.setCallType(1); // 通话状态,1 取消 2 拒绝 3 超时 4 已通话 imData.setStatus(status); imData.setFromUid(from); imData.setToUid(to); @@ -21,4 +21,5 @@ public class ImMsgGen { imDataRes.setData(imData); return imDataRes; } + } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java index 96204340..0e00820c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java @@ -1,5 +1,6 @@ package com.ruoyi.cai.ws.bean; +import com.ruoyi.cai.ws.constant.RoomStatusEnums; import lombok.Data; import java.math.BigDecimal; @@ -13,11 +14,19 @@ public class RoomData { private BigDecimal videoDivide; private Long payCoin = 0L; private Long payIncome = 0L; - private Long hangUpTime; // 掉线时间 + private Long hangUpTime; // 结束时间 private Long settleTime; // 结算时间 private Long beginTime; // 开始时间 private boolean releaseRes = false; + + public boolean isCanCall(){ + return RoomStatusEnums.isCanCall(status); + } + + public boolean isOnline() { + return RoomStatusEnums.STATUS_AGREE.getCode().equals(status); + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java index 6b581465..8ae34c8b 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/OnlineDataCache.java @@ -5,6 +5,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.util.List; +import java.util.Set; + @Component public class OnlineDataCache { @Autowired @@ -14,6 +17,10 @@ public class OnlineDataCache { return RedisConstant.ONLINE_ROOM_DATA; } + public Set getAll(){ + return redisTemplate.opsForSet().members(getKey()); + } + public void add(Long roomId){ redisTemplate.opsForSet().add(getKey(), String.valueOf(roomId)); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java index a3c5bb0b..94e852c7 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomDataCache.java @@ -1,22 +1,17 @@ package com.ruoyi.cai.ws.cache; -import cn.hutool.json.JSONObject; import com.alibaba.fastjson2.JSON; import com.ruoyi.cai.ws.bean.RoomData; import com.ruoyi.cai.ws.constant.RedisConstant; import com.ruoyi.cai.ws.constant.RoomStatusEnums; import org.apache.commons.lang3.BooleanUtils; -import org.apache.poi.ss.formula.functions.Roman; -import org.redisson.api.RBucket; -import org.redisson.api.RMap; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; -import java.util.Arrays; import java.util.Collections; import java.util.Map; @@ -105,4 +100,15 @@ public class RoomDataCache { Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime); return BooleanUtils.isTrue(execute); } + + private final static String INCS_BLACK_AMOUNT = + "redis.call('hIncrBy', KEYS[1], 'payCoin', ARGV[1])\n" + + "redis.call('hIncrBy', KEYS[1], 'payIncome', ARGV[2])"; + + public boolean incsBlackAmount(Long roomId, Long decrCoin, Long decrIncomeCoin) { + DefaultRedisScript redisScript = new DefaultRedisScript<>(INCS_BLACK_AMOUNT,Boolean.class); + Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), + decrCoin,decrIncomeCoin); + return BooleanUtils.isTrue(execute); + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java index 6f2cf134..bd743772 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/MessageHandleApplication.java @@ -2,10 +2,12 @@ package com.ruoyi.cai.ws.handler; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.ruoyi.cai.chat.ChatManager; import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.cache.FdCtxDataCache; import com.ruoyi.cai.ws.constant.HangUpEnums; +import com.ruoyi.cai.ws.dto.WsToken; import com.ruoyi.cai.ws.service.CheckConnectionDTO; import com.ruoyi.cai.ws.service.RoomService; import com.ruoyi.cai.ws.util.MapGetUtil; @@ -29,6 +31,8 @@ public class MessageHandleApplication { private RoomService roomService; @Autowired private FdCtxDataCache fdCtxDataCache; + @Autowired + private ChatManager chatManager; public void processOn(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); @@ -38,23 +42,27 @@ public class MessageHandleApplication { return; } Map attributes = session.getAttributes(); - Long roomId = MapGetUtil.getLong(attributes.get("roomId")); - String sessionKey = MapGetUtil.getString(attributes.get("token")); + WsToken wsToken = chatManager.getToken(String.valueOf(attributes.get("token"))); + if(wsToken == null){ + WsExceptionUtil.throwExceptionFast(session,"无效token"); + return; + } + Long roomId = wsToken.getRoomId(); Room room = roomService.load(roomId); if(room == null){ - WsExceptionUtil.throwException("房间不可用", sessionKey,HangUpEnums.OTHER, roomId); + WsExceptionUtil.throwException(session, "房间不可用" ,HangUpEnums.OTHER, roomId); return; } CheckConnectionDTO checkConnect = roomService.checkConnect(room); if(checkConnect != null){ - WsExceptionUtil.throwException(sessionKey,checkConnect.getMessage(),checkConnect.getHangUpEnums(),roomId); + WsExceptionUtil.throwException(session,checkConnect.getMessage(),checkConnect.getHangUpEnums(),roomId); return; } IMessageHandler handler = map.get(String.valueOf(method)); if(handler == null){ return; } - FdCtxData fdCtxData = fdCtxDataCache.getByRoomId(sessionKey); + FdCtxData fdCtxData = fdCtxDataCache.getByRoomId(session.getId()); handler.processOn(room,fdCtxData, jsonObject); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java index 26dfd12d..296d2c3a 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java @@ -13,6 +13,7 @@ import org.springframework.web.socket.*; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import javax.websocket.server.PathParam; +import java.io.IOException; import java.util.Map; /** @@ -32,9 +33,9 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { * 连接成功后 */ @Override - public void afterConnectionEstablished(@NotNull WebSocketSession session) { + public void afterConnectionEstablished(WebSocketSession session) { openLogic.processOn(session); -// log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId()); + log.info("[connect] sessionId: {},userId:{}", session.getId(), session.getId()); } /** @@ -46,7 +47,6 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - String token = String.valueOf(session.getAttributes().get(WebSocketConstants.TOKEN)); messageHandleApplication.processOn(session,message); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java index 8f9e5efa..cb0b37e7 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/AgreeMessageHandle.java @@ -3,6 +3,7 @@ package com.ruoyi.cai.ws.handler.message; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.cai.domain.UserCall; +import com.ruoyi.cai.mq.AmqpProducer; import com.ruoyi.cai.service.UserCallService; import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.bean.Room; @@ -26,6 +27,8 @@ public class AgreeMessageHandle extends AbstractMessageHandle implements IMessag private RoomDataCache roomDataCache; @Autowired private UserCallService userCallService; + @Autowired + private AmqpProducer amqpProducer; @Override public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) { @@ -41,6 +44,7 @@ public class AgreeMessageHandle extends AbstractMessageHandle implements IMessag .eq(UserCall::getId,room.getRoomId()) .set(UserCall::getStatus, RoomStatusEnums.STATUS_AGREE.getCode()) .set(UserCall::getBeginTime, LocalDateTime.now())); + amqpProducer.sendCheckTimeOut(room.getRoomId()+"",60); // 1分钟延时消息,开始扣钱 String message = "提示:禁止任何涉黄、任何微信QQ引导到其它平台行为"; sendToAll(room.getRoomId(), WsRMsgGen.startVideo(room.getRoomId(),0L),WsRMsgGen.sysNotice(message)); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/CancelMessageHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/CancelMessageHandler.java index 31b7d28c..d188f0f8 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/CancelMessageHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/CancelMessageHandler.java @@ -6,6 +6,7 @@ import com.ruoyi.cai.domain.UserCall; import com.ruoyi.cai.service.UserCallService; import com.ruoyi.cai.trd.ImDataRes; import com.ruoyi.cai.trd.ImMsgGen; +import com.ruoyi.cai.ws.cache.RoomDataCache; import com.ruoyi.yunxin.Yunxin; import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.bean.Room; @@ -38,6 +39,10 @@ public class CancelMessageHandler extends AbstractMessageHandle implements IMess && !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){ return; } + // 挂断房间 + if(!roomService.hangUp(room.getRoomId())){ + return; + } Long roomId = room.getRoomId(); sendToCurrent(fdCtxData,WsRMsgGen.hangup("通话已取消",roomId, HangUpEnums.CANCEL.getCode())); sendToReceiver(roomId,WsRMsgGen.hangup("对方已取消",roomId, HangUpEnums.CANCEL.getCode())); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java index a9e50fe5..04fe8250 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/HangupMessageHandler.java @@ -9,6 +9,7 @@ import com.ruoyi.cai.ws.dto.WsRMsgGen; import com.ruoyi.cai.ws.handler.AbstractMessageHandle; import com.ruoyi.cai.ws.handler.IMessageHandler; import com.ruoyi.cai.ws.service.RoomService; +import com.ruoyi.cai.ws.service.SettleService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -20,7 +21,7 @@ public class HangupMessageHandler extends AbstractMessageHandle implements IMess @Autowired private RoomService roomService; @Autowired - private RoomDataCache roomDataCache; + private SettleService settleService; @Override public void processOn(Room room, FdCtxData fdCtxData, JSONObject map) { @@ -31,13 +32,12 @@ public class HangupMessageHandler extends AbstractMessageHandle implements IMess if(!room.getRoomId().equals(fdCtxData.getRoomId())){ return; } - boolean b = roomDataCache.hangUp(room.getRoomId()); - if(!b){ + // 挂断房间 + if(!roomService.hangUp(room.getRoomId())){ return; } - // 触发结算 TODO - - + // 触发结算 + settleService.processOn(room); Integer type = fdCtxData.isCaller() ? HangUpEnums.FROM.getCode() : HangUpEnums.TO.getCode(); sendToCurrent(fdCtxData,WsRMsgGen.hangup("您已挂断",room.getRoomId(), type)); sendToTar(fdCtxData,WsRMsgGen.hangup("对方已挂断",room.getRoomId(), type)); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/RefuseMessageHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/RefuseMessageHandler.java index 61af316d..d2749e94 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/RefuseMessageHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/message/RefuseMessageHandler.java @@ -24,8 +24,6 @@ import org.springframework.stereotype.Component; */ @Component("refuse") public class RefuseMessageHandler extends AbstractMessageHandle implements IMessageHandler { - @Autowired - private RoomDataCache roomDataCache; @Autowired private RoomService roomService; @Autowired @@ -37,7 +35,7 @@ public class RefuseMessageHandler extends AbstractMessageHandle implements IMess if(!fdCtxData.isReceiver() || !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){ return; } - boolean b = roomDataCache.hangUp(room.getRoomId()); + boolean b = roomService.hangUp(room.getRoomId()); if(!b){ return; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java new file mode 100644 index 00000000..36b59910 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/CheckTimeOutJob.java @@ -0,0 +1,75 @@ +package com.ruoyi.cai.ws.job; + +import cn.hutool.core.date.DateUtil; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.ruoyi.cai.domain.UserCall; +import com.ruoyi.cai.service.UserCallService; +import com.ruoyi.cai.trd.ImDataRes; +import com.ruoyi.cai.trd.ImMsgGen; +import com.ruoyi.cai.ws.bean.Room; +import com.ruoyi.cai.ws.bean.UserData; +import com.ruoyi.cai.ws.cache.OnlineDataCache; +import com.ruoyi.cai.ws.cache.RoomCtxCache; +import com.ruoyi.cai.ws.constant.RoomStatusEnums; +import com.ruoyi.cai.ws.service.RoomService; +import com.ruoyi.yunxin.Yunxin; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Component +public class CheckTimeOutJob { + @Autowired + private OnlineDataCache onlineDataCache; + @Autowired + private RoomService roomService; + @Autowired + private UserCallService userCallService; + @Autowired + private RoomCtxCache roomCtxCache; + @Autowired + private Yunxin yunxin; + /** + * 1 分钟执行一次 + */ + @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) + public void run(){ + Set roomIdStr = onlineDataCache.getAll(); + for (String roomIdS : roomIdStr) { + Long roomId = Long.valueOf(roomIdS); + this.deal(roomId); + } + } + + public void deal(Long roomId){ + Room room = roomService.load(roomId); + if(!RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(room.getStatus()) + && !RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(room.getStatus())){ + return; + } + UserData callUserData = room.getCallUserData(); + UserData receiverUserData = room.getReceiverUserData(); + // 检测是不是3分钟没有接听 + Long connectTimeCaller = callUserData.getConnectTime(); + Long connectTimeReceiver = receiverUserData.getConnectTime(); + boolean timeOut = false; + if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){ + timeOut = true; + }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){ + timeOut = true; + } + if(timeOut){ + if(roomService.hangUp(roomId)){ + userCallService.update(Wrappers.lambdaUpdate(UserCall.class) + .eq(UserCall::getId,roomId) + .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); + roomService.closeAllFd(roomId); + ImDataRes imDataRes = ImMsgGen.callNotice(3, callUserData.getId(), receiverUserData.getId(), 0); + yunxin.sendTo(receiverUserData.getId(),callUserData.getId(),imDataRes); + } + } + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java new file mode 100644 index 00000000..73e362fb --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/HeartbeatJob.java @@ -0,0 +1,99 @@ +package com.ruoyi.cai.ws.job; + +import cn.hutool.core.date.DateUtil; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.ruoyi.cai.domain.UserCall; +import com.ruoyi.cai.service.UserCallService; +import com.ruoyi.cai.ws.bean.Room; +import com.ruoyi.cai.ws.bean.UserData; +import com.ruoyi.cai.ws.cache.OnlineDataCache; +import com.ruoyi.cai.ws.cache.RoomCtxCache; +import com.ruoyi.cai.ws.constant.HangUpEnums; +import com.ruoyi.cai.ws.constant.RoomStatusEnums; +import com.ruoyi.cai.ws.dto.WsR; +import com.ruoyi.cai.ws.dto.WsRMsgGen; +import com.ruoyi.cai.ws.service.RoomService; +import com.ruoyi.cai.ws.service.SettleService; +import com.ruoyi.cai.ws.util.RoomWebSocketUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class HeartbeatJob { + + @Autowired + private OnlineDataCache onlineDataCache; + @Autowired + private RoomService roomService; + @Autowired + private UserCallService userCallService; + @Autowired + private SettleService settleService; + @Autowired + private RoomCtxCache roomCtxCache; + + + /** + * 30秒执行一次 + */ + @Scheduled(fixedDelay = 30,timeUnit = TimeUnit.SECONDS) + public void run(){ + Set all = onlineDataCache.getAll(); + for (String roomIdS : all) { + try { + Long roomId = Long.valueOf(roomIdS); + Room room = roomService.load(roomId); + if(!room.isCanCall()){ + return; + } + UserData callUserData = room.getCallUserData(); + UserData receiverUserData = room.getReceiverUserData(); + boolean timeOut = false; + WsR hangup = null; + if(isHeartTimeout(callUserData)){ + timeOut = true; + hangup = WsRMsgGen.hangup("呼叫方连接中断", roomId, HangUpEnums.FROM.getCode()); + }else if(isHeartTimeout(receiverUserData)){ + timeOut = true; + hangup = WsRMsgGen.hangup("接听方连接中断", roomId, HangUpEnums.TO.getCode()); + } + if(timeOut){ + if(roomService.hangUp(roomId)){ + userCallService.update(Wrappers.lambdaUpdate(UserCall.class) + .eq(UserCall::getId,roomId) + .set(UserCall::getStatus, RoomStatusEnums.STATUS_TIMEOUT_CANCEL.getCode())); + settleService.processOn(room); + } + List keys = roomCtxCache.getSessionKeysByRoomId(roomId); + RoomWebSocketUtil.sendSendMessage(keys, hangup); + roomService.closeAllFd(roomId); + } + }catch (Exception e){ + log.error("定时心跳检测失败!",e); + } + + } + } + + /** + * 是否心跳超时 + * @param userData + * @return + */ + private boolean isHeartTimeout(UserData userData){ + if(userData.getConnectTime() == null){ + return false; + } + if(DateUtil.currentSeconds() - userData.getHeartTime() < 120){ + return false; + } + return true; + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java new file mode 100644 index 00000000..60af87dc --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/job/SettleJob.java @@ -0,0 +1,44 @@ +package com.ruoyi.cai.ws.job; + +import com.ruoyi.cai.ws.bean.Room; +import com.ruoyi.cai.ws.cache.OnlineDataCache; +import com.ruoyi.cai.ws.service.RoomService; +import com.ruoyi.cai.ws.service.SettleService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class SettleJob { + @Autowired + private OnlineDataCache onlineDataCache; + @Autowired + private SettleService settleService; + @Autowired + private RoomService roomService; + + /** + * 每 1 分钟执行一次 + */ + @Scheduled(fixedDelay = 60,timeUnit = TimeUnit.SECONDS) + public void run(){ + Set all = onlineDataCache.getAll(); + for (String roomIdS : all) { + try { + Long roomId = Long.valueOf(roomIdS); + Room room = roomService.load(roomId); + if(room == null){ + return; + } + settleService.processOn(room); + }catch (Exception e){ + log.info("定时任务结算失败!",e); + } + } + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java index 8069caea..be803827 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java @@ -3,6 +3,7 @@ package com.ruoyi.cai.ws.processon; import cn.hutool.core.date.DateUtil; import com.ruoyi.cai.chat.ChatManager; import com.ruoyi.cai.executor.ExecutorConstant; +import com.ruoyi.cai.mq.AmqpProducer; import com.ruoyi.cai.service.AnchorService; import com.ruoyi.cai.service.UserService; import com.ruoyi.cai.trd.Agora; @@ -27,7 +28,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,7 +57,7 @@ public class OpenLogic { @Autowired private Yunxin yunxin; - public void processOn(WebSocketSession session) throws IOException { + public void processOn(WebSocketSession session) { Map attributes = session.getAttributes(); Object token = attributes.get("token"); if(token == null){ @@ -105,9 +105,11 @@ public class OpenLogic { // 走接收方逻辑 receiverConnection(session,room,userId); } - } + @Autowired + private AmqpProducer amqpProducer; + public void callerConnection(WebSocketSession session,Room room,Long userId){ boolean isFirst = false; Integer status = room.getStatus(); @@ -121,6 +123,7 @@ public class OpenLogic { roomDataCache.setStatus(room.getRoomId(),RoomStatusEnums.STATUS_CALLER_CONNECT); onlineDataCache.add(room.getRoomId()); userService.updateVideoStatus(userId,1); + amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182); isFirst = true; } // 已经接通 @@ -166,6 +169,7 @@ public class OpenLogic { map.put("connectTime", DateUtil.currentSeconds()); map.put("heartTime",DateUtil.currentSeconds()); userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_RECEIVER,map); + amqpProducer.sendCommonDelayMq(1,room.getRoomId(),182); // 房间号状态设置为 接收方已连接 boolean res = roomDataCache.setStatusReceiverConnection(room.getRoomId()); if(!res){ diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java index 7c00b184..cd62da8a 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java @@ -85,6 +85,10 @@ public class RoomService { } } + /** + * 删除房间号 关联的ws链接 同时 踢人下线 + * @param roomId + */ public void closeAllFd(Long roomId){ List sessionKeysByRoomId = roomCtxCache.getSessionKeysByRoomId(roomId); for (String sessionKey : sessionKeysByRoomId) { @@ -117,6 +121,17 @@ public class RoomService { return null; } + /** + * 挂断房间 + * 将房间状态设置为 通话结束,同时设置结束时间 + * + * @param roomId + * @return + */ + public boolean hangUp(Long roomId){ + return roomDataCache.hangUp(roomId); + } + public Long getCallTime(Room room) { RoomData roomData = room.getRoomData(); Long beginTime = roomData.getBeginTime(); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java index 9a8589d5..d980a814 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java @@ -1,22 +1,39 @@ package com.ruoyi.cai.ws.service; import cn.hutool.core.date.DateUtil; +import cn.hutool.extra.spring.SpringUtil; import com.ruoyi.cai.domain.Account; +import com.ruoyi.cai.dto.video.VideoSettleResp; +import com.ruoyi.cai.dto.video.WithholdingFeeUserResp; +import com.ruoyi.cai.manager.LockManager; import com.ruoyi.cai.service.AccountService; import com.ruoyi.cai.trd.ImDataRes; import com.ruoyi.cai.trd.ImMsgGen; +import com.ruoyi.cai.ws.cache.CallerRoomCache; +import com.ruoyi.cai.ws.cache.RoomCtxCache; +import com.ruoyi.cai.ws.constant.HangUpEnums; +import com.ruoyi.cai.ws.dto.WsR; +import com.ruoyi.cai.ws.dto.WsRMsgGen; +import com.ruoyi.cai.ws.util.RoomWebSocketUtil; import com.ruoyi.yunxin.Yunxin; import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.RoomData; import com.ruoyi.cai.ws.bean.UserData; import com.ruoyi.cai.ws.cache.OnlineDataCache; import com.ruoyi.cai.ws.cache.RoomDataCache; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.poi.ss.formula.functions.Roman; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; @Component @Slf4j @@ -32,94 +49,124 @@ public class SettleService { private OnlineDataCache onlineDataCache; @Autowired private Yunxin yunxin; + @Autowired + private RoomCtxCache roomCtxCache; + @Autowired + private RedissonClient redissonClient; - public void processOn(Room room){ + + /** + * 预扣费 + * @param roomId + */ + public boolean withholdingFee(Long roomId){ + Room room = roomService.load(roomId); + if(room == null || !room.isOnline()){ + log.warn("房间已经不存在或者不在线 无需预扣费 roomId={}",roomId); + return false; + } + SettleService settleService = SpringUtil.getBean(SettleService.class); try { - deal(room); - }finally { - + Long userId = room.getCallUserData().getId(); + Long price = room.getRoomData().getCallPrice(); + settleService.withholdingFeeUser(userId,price,room); + return true; + }catch (Exception e){ + log.error("预扣费失败!准备挂断电话",e); + boolean b = roomService.hangUp(room.getRoomId()); + if(b){ + // 结算操作 + settleService.processOn(room); + } + // 向客户端发送挂断指令 + WsR r = WsRMsgGen.hangup("拨打方余额不足", room.getRoomId(), HangUpEnums.NOTMONEY.getCode()); + List sessionKeys = roomCtxCache.getSessionKeysByRoomId(roomId); + RoomWebSocketUtil.sendSendMessage(sessionKeys, r); + roomService.closeAllFd(roomId); + return false; } } + /** + * 预扣费业务操作 + */ + @Transactional(rollbackFor = Exception.class) + public void withholdingFeeUser(Long userId,Long price,Room room){ + WithholdingFeeUserResp resp = accountService.withholdingFeeUser(userId, price); + roomDataCache.incsBlackAmount(room.getRoomId(),resp.getDecrCoin(),resp.getDecrIncomeCoin()); + } + + + /** + * 结算处理 + * @param room + */ + @SneakyThrows + public void processOn(Room room){ + String lock = LockManager.getVideoSettleLock(room.getRoomId()); + RLock clientLock = redissonClient.getLock(lock); + boolean locked = clientLock.isLocked(); + if(locked){ + log.info("正在结算中,稍等!"); + return; + } + boolean lockFlag = clientLock.tryLock(5, TimeUnit.SECONDS); + if(!lockFlag){ + log.info("正在结算中,稍等!"); + return; + } + try { + deal(room); + }finally { + clientLock.unlock(); + } + + } + + /** + * 结算内部整合逻辑 + * @param room + */ private void deal(Room room){ if(room.isCanCall() || room.isSettle()){ return; } - if(!room.isReleaseRes()){ - roomService.releaseRes(Long.valueOf(room.getRoomId())); + if(!room.isReleaseRes()){ // 房间资源是否已经释放 + roomService.releaseRes(room.getRoomId()); return; } - // 结算 - if(!settle(room)){ - return; - } - // 移除在线房间状态 - onlineDataCache.remove(room.getRoomId()); - // 更新支付金额信息 - Map map = new HashMap<>(); - map.put("payCoin", ""); // 实际支付的金额 - map.put("payIncome", ""); - map.put("settleTime", DateUtil.currentSeconds()); - roomDataCache.hMSet(Long.valueOf(room.getRoomId()),map); - // 主叫方通话市场通知 - Long callTime = roomService.getCallTime(room); - Long receiverUserId = room.getReceiverUserData().getId(); - Long callUserId = room.getCallUserData().getId(); - if(callTime > 0){ - ImDataRes imDataRes = ImMsgGen.callNotice(4, receiverUserId, callUserId, callTime); - yunxin.sendTo(room.getCallUserData().getId(),room.getReceiverUserData().getId(),imDataRes); - } - // 收入通知 - if(room != null){ // TODO修改数据 - - } - // 邀请人收入通知 - // 排行榜通知 - } - - /** - * 结算 - */ - private boolean settle(Room room){ Long roomId = room.getRoomId(); // 未通话,无需结算 if(room.getRoomData().getBeginTime() == null){ roomDataCache.hMSet(roomId,"settleTime", DateUtil.currentSeconds()); - return false; + return; } - // 开始结算 - // 呼叫方扣费 - this.computeCallerPay(room); - // 结束结算 - return true; - } - - private void computeCallerPay(Room room) { - RoomData roomData = room.getRoomData(); - UserData callUserData = room.getCallUserData(); - Long callPrice = roomData.getCallPrice(); + VideoSettleResp resp = accountService.videoSettle(room); + // 修改房间缓存 + Map map = new HashMap<>(); + map.put("payCoin",resp.getPayCoin()); + map.put("payIncome",resp.getPayIncome()); + map.put("settleTime", DateUtil.currentSeconds()); + roomDataCache.hMSet(roomId,map); + // 移除在线房间状态 + onlineDataCache.remove(room.getRoomId()); + // 主叫方时长通知 Long callTime = roomService.getCallTime(room); - // 本次支付金额 - Long totalAmount = callPrice * (callTime / 60); - // 补差价 - Long diff = totalAmount - roomData.getPayCoin() + roomData.getPayIncome(); - - Account callAccount = accountService.getByUserId(callUserData.getId()); - Long userAmount = callAccount.getCoin() + callAccount.getIncomeCoin(); - if(diff > 0){ - // 账户上面有余额 - if(userAmount > 0){ - diff = (userAmount > diff) ? diff : userAmount; - log.info("roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff); - // TODO 对账户进行统一扣费 - - }else{ - log.info("房间需要补差价,但用户余额不足 roomid {} 已支付 {} 仍需要补差价:{}", roomData.getRoomId(),roomData.getPayCoin() + roomData.getPayIncome(),diff); - } - }else{ - // 退钱逻辑 - diff = Math.abs(diff); - // TODO 对账户进行统一退钱逻辑 + Long callId = room.getCallUserData().getId(); + Long receiveId = room.getReceiverUserData().getId(); + if(callTime > 0){ + ImDataRes imDataRes = ImMsgGen.callNotice(4, callId, receiveId, callTime); + yunxin.sendTo(callId,receiveId,imDataRes); + } + // 接收方通知 + Long anchorIncome = resp.getAnchorIncome(); + if(anchorIncome > 0){ + ImDataRes imDataRes = ImMsgGen.callNotice(4, receiveId, callId, callTime); + yunxin.sendTo(receiveId,callId,imDataRes); + // 收入通知 +// ImMsgGen.videoIncome(); +// yunxin.sendTo(receiveId,null,) } } + }