通话逻辑

This commit is contained in:
张良(004796)
2023-12-29 12:45:41 +08:00
parent 973c9545af
commit 244fadd1e6
33 changed files with 746 additions and 69 deletions

View File

@@ -2,44 +2,39 @@ package com.ruoyi.cai.agora;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.cai.util.RestTemplateUtil;
import com.ruoyi.websocket.config.AgoraProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.*;
@Component
public class Agora {
private RestTemplate restTemplate = new RestTemplate();
private final String APP_ID = "appId";
@Autowired
private AgoraProperties agoraProperties;
public String getAuthorizationHeader(){
// 客户 ID
final String customerKey = "Your customer key";
// 客户密钥
final String customerSecret = "Your customer secret";
// 拼接客户 ID 和客户密钥并使用 base64 编码
String plainCredentials = customerKey + ":" + customerSecret;
String plainCredentials = agoraProperties.getKey() + ":" + agoraProperties.getSecret();
String base64Credentials = new String(Base64.getEncoder().encode(plainCredentials.getBytes()));
// 创建 authorization header
String authorizationHeader = "Basic " + base64Credentials;
return authorizationHeader;
return "Basic " + base64Credentials;
}
public List<String> getChannelUsers(String roomId){
String url = "http://api.sd-rtn.com/dev/v1/channel/user/%s/%s";
url = String.format(url, APP_ID, roomId);
url = String.format(url, agoraProperties.getAppId(), roomId);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", getAuthorizationHeader());
HttpEntity httpEntity = new HttpEntity<>(headers);
JSONObject jsonobject = restTemplate.getForObject(url,JSONObject.class,httpEntity);
JSONObject jsonobject = RestTemplateUtil.restTemplate.getForObject(url,JSONObject.class,httpEntity);
// "success":true,"data":{"channel_exist":true,"mode":2,"broadcasters":[1001,1025],"audience":[],"audience_total":0}}
if(jsonobject == null){
return Collections.emptyList();
@@ -51,7 +46,7 @@ public class Agora {
public void closeChannel(String roomId){
String url = "http://api.sd-rtn.com/dev/v1/kicking-rule";
Map<String,Object> param = new HashMap<>();
param.put("appid",APP_ID);
param.put("appid",agoraProperties.getAppId());
param.put("cname",roomId);
param.put("time",0);
param.put("privileges",Collections.emptyList());
@@ -59,7 +54,7 @@ public class Agora {
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", getAuthorizationHeader());
HttpEntity httpEntity = new HttpEntity<>(param, headers);
restTemplate.postForObject(url, httpEntity, JSONObject.class);
RestTemplateUtil.restTemplate.postForObject(url, httpEntity, JSONObject.class);
}
}

View File

@@ -41,7 +41,7 @@ public class ExecutorConstant {
CPU_NUM << 2,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50),
new ArrayBlockingQueue<>(5),
init("roomThreadPoll-%d"),
new ThreadPoolExecutor.CallerRunsPolicy());
ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(roomExecutor);

View File

@@ -0,0 +1,31 @@
package com.ruoyi.cai.util;
import org.springframework.http.client.BufferingClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class RestTemplateUtil {
public static RestTemplate restTemplate = null;
static {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setConnectTimeout(3000);
requestFactory.setReadTimeout(3000);
restTemplate = new RestTemplate(new BufferingClientHttpRequestFactory(requestFactory));
List<HttpMessageConverter<?>> messageConverters = restTemplate.getMessageConverters();
//添加转换器
for (HttpMessageConverter<?> messageConverter : messageConverters) {
if (messageConverter instanceof StringHttpMessageConverter) {
StringHttpMessageConverter converter = (StringHttpMessageConverter) messageConverter;
converter.setDefaultCharset(StandardCharsets.UTF_8);
}
}
}
}

View File

@@ -1,5 +1,6 @@
package com.ruoyi.cai.ws.bean;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import lombok.Data;
@Data
@@ -11,4 +12,12 @@ public class FdCtxData {
private Integer userType;
private Long tarUserId;
private Integer tarUserType;
public boolean isReceiver(){
return userType == UserDataConstant.TYPE_RECEIVER;
}
public boolean isCaller(){
return userType == UserDataConstant.TYPE_CALLER;
}
}

View File

@@ -1,13 +1,8 @@
package com.ruoyi.cai.ws.bean;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.service.CheckConnectionDTO;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
@Data
public class Room {

View File

@@ -11,6 +11,8 @@ public class RoomData {
private String skillName;
private Integer status;
private BigDecimal videoDivide;
private Integer payCoin = 0;
private Integer payIncome = 0;
private Long hangUpTime;
private Long beginTime;

View File

@@ -3,6 +3,7 @@ package com.ruoyi.cai.ws.cache;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.bean.FdCtxData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.SearchStrategy;
import org.springframework.data.redis.core.StringRedisTemplate;
@@ -24,4 +25,13 @@ public class FdCtxDataCache {
redisTemplate.opsForValue().set(getKey(fdCtxData.getSessionKey()),str,5, TimeUnit.DAYS);
}
public FdCtxData getByRoomId(String sessionKey){
String key = getKey(sessionKey);
String s = redisTemplate.opsForValue().get(key);
if(StringUtils.isEmpty(s)){
return null;
}
return JSON.parseObject(s,FdCtxData.class);
}
}

View File

@@ -1,11 +1,14 @@
package com.ruoyi.cai.ws.cache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.websocket.holder.WebSocketSessionHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -24,6 +27,16 @@ public class RoomCtxCache {
redisTemplate.expire(key,7, TimeUnit.DAYS);
}
public List<String> getSessionKeysByRoomId(String roomId){
String key = getKey(roomId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
List<String> res = new ArrayList<>();
for (Object o : entries.keySet()) {
res.add(String.valueOf(o));
}
return res;
}
public String getSessionKeyByRoomIdAndUserType(String roomId,Integer userType){
String key = getKey(roomId);
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
@@ -38,6 +51,15 @@ public class RoomCtxCache {
}
public String getSessionKeyReceiverByRoomId(String roomId){
return getSessionKeyByRoomIdAndUserType(roomId,UserDataCache.TYPE_RECEIVER);
return getSessionKeyByRoomIdAndUserType(roomId, UserDataConstant.TYPE_RECEIVER);
}
public String getSessionKeyCallerByRoomId(String roomId){
return getSessionKeyByRoomIdAndUserType(roomId, UserDataConstant.TYPE_CALLER);
}
public void del(String roomId) {
String key = getKey(roomId);
redisTemplate.delete(key);
}
}

View File

@@ -22,17 +22,6 @@ import java.util.Map;
@Component
public class RoomDataCache {
// 房间状态
private final static int STATUS_CREATE = 0; // 刚创建
private final static int STATUS_CALLER_CONNECT = 1; // 呼叫方连接上
private final static int STATUS_CALLER_CANCEL = 2; // 呼叫方取消通话
private final static int STATUS_RECEIVER_CONNECT = 3; // 接收方连接上
private final static int STATUS_CONNECT_CANCEL = 4; // 收到通话未接听
private final static int STATUS_TIMEOUT_CANCEL = 5; // 超时未接听
private final static int STATUS_REFUSE = 6; // 接收方已拒绝
private final static int STATUS_AGREE = 7; // 已接听
private final static int STATUS_HANGUP = 8; // 通话结束
@Autowired
private RedissonClient redissonClient;
@Autowired
@@ -65,10 +54,11 @@ public class RoomDataCache {
"end\n" +
"return redis.call('hMSet', KEYS[1], 'status', 8, 'hangupTime', ARGV[1])";
public void hangUp(String roomId) {
public boolean hangUp(String roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(HANG_UP,Boolean.class);
String currentTime = String.valueOf(System.currentTimeMillis() / 1000);
stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)), currentTime);
return BooleanUtils.isTrue(execute);
}
public void setStatus(String roomId, RoomStatusEnums status) {
@@ -87,4 +77,19 @@ public class RoomDataCache {
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)));
return BooleanUtils.isTrue(execute);
}
private final static String SET_STATUS_AGREE =
"local status = tonumber(redis.call('hget', KEYS[1], 'status'))\n" +
"local beginTime = tonumber(redis.call('hget', KEYS[1], 'beginTime'))\n" +
"if status ~= 3 or beginTime > 0 then\n" +
" return 0\n" +
"end\n" +
"return redis.call('hmset', KEYS[1], 'status', 7, 'beginTime', ARGV[1])";
public boolean setStatusAgree(String roomId) {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(SET_STATUS_AGREE,Boolean.class);
String currentTime = String.valueOf(System.currentTimeMillis() / 1000);
Boolean execute = stringRedisTemplate.execute(redisScript, Collections.singletonList(getKey(roomId)),currentTime);
return BooleanUtils.isTrue(execute);
}
}

View File

@@ -3,6 +3,7 @@ package com.ruoyi.cai.ws.cache;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.cai.ws.util.MapGetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
@@ -12,21 +13,19 @@ import java.util.Map;
@Component
public class UserDataCache {
public final static int TYPE_CALLER = 1; // 视频发起者
public final static int TYPE_RECEIVER = 2; // 视频接收者
@Autowired
private StringRedisTemplate redisTemplate;
public String getKey(String roomId,int type){
return String.format(RedisConstant.USER_ROOM_DATA,roomId,type==TYPE_CALLER?"caller":"receiver");
return String.format(RedisConstant.USER_ROOM_DATA,roomId,type== UserDataConstant.TYPE_CALLER?"caller":"receiver");
}
public UserData getCallerUserDataByRoom(String roomId){
return getUserDataByRoom(roomId,TYPE_CALLER);
return getUserDataByRoom(roomId,UserDataConstant.TYPE_CALLER);
}
public UserData getReceiverUserDataByRoom(String roomId){
return getUserDataByRoom(roomId,TYPE_RECEIVER);
return getUserDataByRoom(roomId,UserDataConstant.TYPE_RECEIVER);
}
public UserData getUserDataByRoom(String roomId,int type){
@@ -58,11 +57,11 @@ public class UserDataCache {
}
public void initCaller(UserData callerUserData) {
init(callerUserData,TYPE_CALLER);
init(callerUserData,UserDataConstant.TYPE_CALLER);
}
public void initReceiver(UserData callerUserData) {
init(callerUserData,TYPE_RECEIVER);
init(callerUserData,UserDataConstant.TYPE_RECEIVER);
}
public void hMSet(String roomId,Integer userType,Map<String,Object> data) {

View File

@@ -0,0 +1,8 @@
package com.ruoyi.cai.ws.constant;
public class UserDataConstant {
public final static int TYPE_CALLER = 1; // 视频发起者
public final static int TYPE_RECEIVER = 2; // 视频接收者
}

View File

@@ -10,11 +10,12 @@ import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.cache.*;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.constant.UserDataConstant;
import com.ruoyi.cai.ws.service.CheckConnectionDTO;
import com.ruoyi.cai.ws.service.RoomService;
import com.ruoyi.cai.ws.util.WsExceptionUtil;
import com.ruoyi.websocket.dto.WsRMsgGen;
import com.ruoyi.websocket.handle.IOpenLogic;
import com.ruoyi.websocket.handler.IOpenLogic;
import com.ruoyi.websocket.util.RoomWebSocketUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -80,13 +81,13 @@ public class OpenLogic implements IOpenLogic {
fdCtxData.setRoomId(room.getRoomId());
fdCtxData.setUserId(userId);
if(userId.equals(room.getCallUserData().getId())){
fdCtxData.setUserType(UserDataCache.TYPE_CALLER);
fdCtxData.setUserType(UserDataConstant.TYPE_CALLER);
fdCtxData.setTarUserId(room.getReceiverUserData().getId());
fdCtxData.setTarUserType(UserDataCache.TYPE_RECEIVER);
fdCtxData.setTarUserType(UserDataConstant.TYPE_RECEIVER);
} else {
fdCtxData.setUserType(UserDataCache.TYPE_RECEIVER);
fdCtxData.setUserType(UserDataConstant.TYPE_RECEIVER);
fdCtxData.setTarUserId(room.getCallUserData().getId());
fdCtxData.setTarUserType(UserDataCache.TYPE_CALLER);
fdCtxData.setTarUserType(UserDataConstant.TYPE_CALLER);
}
fdCtxDataCache.save(fdCtxData);
roomCtxCache.addFd(sessionKey,roomId,fdCtxData.getUserType());
@@ -101,7 +102,7 @@ public class OpenLogic implements IOpenLogic {
Map<String,Object> map = new HashMap<>();
map.put("connectTime", DateUtil.currentSeconds());
map.put("heartTime",DateUtil.currentSeconds());
userDataCache.hMSet(room.getRoomId(), UserDataCache.TYPE_CALLER,map);
userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_CALLER,map);
roomDataCache.setStatus(room.getRoomId(),RoomStatusEnums.STATUS_CALLER_CONNECT);
onlineDataCache.add(room.getRoomId());
userService.updateVideoStatus(userId,1);
@@ -142,7 +143,7 @@ public class OpenLogic implements IOpenLogic {
Map<String,Object> map = new HashMap<>();
map.put("connectTime", DateUtil.currentSeconds());
map.put("heartTime",DateUtil.currentSeconds());
userDataCache.hMSet(room.getRoomId(), UserDataCache.TYPE_RECEIVER,map);
userDataCache.hMSet(room.getRoomId(), UserDataConstant.TYPE_RECEIVER,map);
boolean res = roomDataCache.setStatusReceiverConnection(room.getRoomId());
if(!res){
return;

View File

@@ -1,22 +1,25 @@
package com.ruoyi.cai.ws.service;
import cn.hutool.core.date.DateUtil;
import com.ruoyi.cai.domain.CaiAccount;
import com.ruoyi.cai.service.CaiAccountService;
import com.ruoyi.cai.ws.bean.Room;
import com.ruoyi.cai.ws.bean.RoomData;
import com.ruoyi.cai.ws.bean.UserData;
import com.ruoyi.cai.ws.cache.CallerRoomCache;
import com.ruoyi.cai.ws.cache.RoomDataCache;
import com.ruoyi.cai.ws.cache.UserDataCache;
import com.ruoyi.cai.ws.cache.*;
import com.ruoyi.cai.ws.constant.HangUpEnums;
import com.ruoyi.cai.ws.constant.RoomStatusEnums;
import com.ruoyi.cai.ws.util.WsExceptionUtil;
import com.ruoyi.websocket.util.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
public class RoomService {
@Autowired
private RoomDataCache roomDataCache;
@@ -24,6 +27,12 @@ public class RoomService {
private UserDataCache userDataCache;
@Autowired
private CallerRoomCache callerRoomCache;
@Autowired
private CaiAccountService accountService;
@Autowired
private FdCtxDataCache fdCtxDataCache;
@Autowired
private RoomCtxCache roomCtxCache;
public Room load(String roomId){
Room room = new Room();
@@ -65,6 +74,15 @@ public class RoomService {
}
}
public void closeAllFd(String roomId){
List<String> sessionKeysByRoomId = roomCtxCache.getSessionKeysByRoomId(roomId);
for (String sessionKey : sessionKeysByRoomId) {
WebSocketUtils.close(sessionKey);
}
// 删除房间FD,避免重连
roomCtxCache.del(roomId);
}
public void delCallRoom(Long fromUid) {
callerRoomCache.del(fromUid);
}
@@ -100,4 +118,26 @@ public class RoomService {
return DateUtil.currentSeconds() - beginTime;
}
public Long canCallTime(Room room) {
try {
Long callId = room.getCallUserData().getId();
CaiAccount account = accountService.getByUserId(callId);
if(account == null){
return 0L;
}
RoomData roomData = room.getRoomData();
int blockAmount = roomData.getPayCoin() + roomData.getPayIncome();
long totalAmount = account.getTotalCoin()+account.getIncomeCoin() + blockAmount;
long totalSecond = (totalAmount / roomData.getCallPrice()) / 60;
long useTime = 0;
if(roomData.getBeginTime() != null){
useTime = DateUtil.currentSeconds() - roomData.getBeginTime();
}
return totalSecond - useTime;
}catch (Exception e){
log.error("计算可通话时间失败!",e);
return 0L;
}
}
}