diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/agora/Agora.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/agora/Agora.java new file mode 100644 index 00000000..b9eb3d53 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/agora/Agora.java @@ -0,0 +1,65 @@ +package com.ruoyi.cai.agora; + +import com.alibaba.fastjson2.JSONArray; +import com.alibaba.fastjson2.JSONObject; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; + +import java.util.*; + +@Component +public class Agora { + + private RestTemplate restTemplate = new RestTemplate(); + + private final String APP_ID = "appId"; + + public String getAuthorizationHeader(){ + // 客户 ID + final String customerKey = "Your customer key"; + // 客户密钥 + final String customerSecret = "Your customer secret"; + + // 拼接客户 ID 和客户密钥并使用 base64 编码 + String plainCredentials = customerKey + ":" + customerSecret; + String base64Credentials = new String(Base64.getEncoder().encode(plainCredentials.getBytes())); + // 创建 authorization header + String authorizationHeader = "Basic " + base64Credentials; + return authorizationHeader; + } + + + public List getChannelUsers(String roomId){ + String url = "http://api.sd-rtn.com/dev/v1/channel/user/%s/%s"; + url = String.format(url, APP_ID, roomId); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("Authorization", getAuthorizationHeader()); + HttpEntity httpEntity = new HttpEntity<>(headers); + JSONObject jsonobject = restTemplate.getForObject(url,JSONObject.class,httpEntity); + // "success":true,"data":{"channel_exist":true,"mode":2,"broadcasters":[1001,1025],"audience":[],"audience_total":0}} + if(jsonobject == null){ + return Collections.emptyList(); + } + JSONArray jsonArray = jsonobject.getJSONObject("data").getJSONArray("broadcasters"); + return jsonArray.toJavaList(String.class); + } + + public void closeChannel(String roomId){ + String url = "http://api.sd-rtn.com/dev/v1/kicking-rule"; + Map param = new HashMap<>(); + param.put("appid",APP_ID); + param.put("cname",roomId); + param.put("time",0); + param.put("privileges",Collections.emptyList()); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.set("Authorization", getAuthorizationHeader()); + HttpEntity httpEntity = new HttpEntity<>(param, headers); + restTemplate.postForObject(url, httpEntity, JSONObject.class); + } + +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/executor/ExecutorConstant.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/executor/ExecutorConstant.java index fbbe2f86..69838e8d 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/executor/ExecutorConstant.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/executor/ExecutorConstant.java @@ -17,6 +17,7 @@ public class ExecutorConstant { public static Executor SYNC_EXECUTOR; public static Executor COMMON_EXECUTOR; + public static Executor ROOM_EXECUTOR; static { ThreadPoolExecutor syncExecutor = initExecutor(CPU_NUM, @@ -36,6 +37,15 @@ public class ExecutorConstant { new ThreadPoolExecutor.CallerRunsPolicy()); COMMON_EXECUTOR = TtlExecutors.getTtlExecutor(commonExecutor); + ThreadPoolExecutor roomExecutor = new ThreadPoolExecutor(CPU_NUM, + CPU_NUM << 2, + 5, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(50), + init("roomThreadPoll-%d"), + new ThreadPoolExecutor.CallerRunsPolicy()); + ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(roomExecutor); + } private static ThreadFactory init(String nameFormat){ diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java index febee942..80d8db4e 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java @@ -1,6 +1,8 @@ package com.ruoyi.cai.ws.processon; import cn.hutool.core.date.DateUtil; +import com.ruoyi.cai.agora.Agora; +import com.ruoyi.cai.executor.ExecutorConstant; import com.ruoyi.cai.service.CaiAnchorService; import com.ruoyi.cai.service.CaiUserService; import com.ruoyi.cai.ws.bean.FdCtxData; @@ -20,6 +22,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; import java.util.HashMap; +import java.util.List; import java.util.Map; @Component @@ -40,6 +43,8 @@ public class OpenLogic implements IOpenLogic { private CaiUserService userService; @Autowired private CaiAnchorService anchorService; + @Autowired + private Agora agora; @Override public void processOn(WebSocketSession session) { @@ -104,25 +109,20 @@ public class OpenLogic implements IOpenLogic { } // 已经接通 if(room.isOnline()){ - // 如果视频也掉线了,则重连的时候发送消息提示 -// sgo(function() use ($rs){ -// $users = Agora::getChannelUsers($rs->room->id); -// if (in_array($rs->caller->id, $users)) { -// return; -// } -// -// $rs->sendToCurrent(WsMsgGen::startVideo($rs->room->id, $rs->callTime())) -// ->sendToCurrent(WsMsgGen::sysNotice('重连成功,房间已通话 '.gmdate('H:i:s', $rs->callTime()))) -// ->sendToReceiver(WsMsgGen::sysNotice('对方已重连成功')); -// }); - - Long callTime = roomService.getCallTime(room); - RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.startVideo(room.getRoomId(), callTime)); - RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("重连成功,房间已通话(转换成时分秒) ")); - String sessionKeyReceiver = roomCtxCache.getSessionKeyReceiverByRoomId(room.getRoomId()); - if(StringUtils.isNotEmpty(sessionKeyReceiver)){ - RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("对方已重连成功")); - } + ExecutorConstant.ROOM_EXECUTOR.execute(() -> { + // 如果视频也掉线了,则重连的时候发送消息提示 + List channelUsers = agora.getChannelUsers(room.getRoomId()); + if(channelUsers.contains(userId+"")){ + return; + } + Long callTime = roomService.getCallTime(room); + RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.startVideo(room.getRoomId(), callTime)); + RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("重连成功,房间已通话(转换成时分秒) ")); + String sessionKeyReceiver = roomCtxCache.getSessionKeyReceiverByRoomId(room.getRoomId()); + if(StringUtils.isNotEmpty(sessionKeyReceiver)){ + RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("对方已重连成功")); + } + }); } if(RoomStatusEnums.STATUS_CREATE.getCode().equals(status) || RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(status) || @@ -131,7 +131,7 @@ public class OpenLogic implements IOpenLogic { RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.response(room.getRoomId())); } if(isFirst){ - // 给对方发送呼叫页面 + // 给对方发送呼叫页面 TODO } } public void receiverConnection(String sessionKey,Room room,Long userId){ diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WsExceptionUtil.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WsExceptionUtil.java index 06988e94..f292e6e0 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WsExceptionUtil.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WsExceptionUtil.java @@ -1,16 +1,23 @@ package com.ruoyi.cai.ws.util; +import com.esotericsoftware.minlog.Log; import com.ruoyi.cai.ws.constant.HangUpEnums; +import com.ruoyi.websocket.dto.WsRMsgGen; +import com.ruoyi.websocket.util.RoomWebSocketUtil; +import com.ruoyi.websocket.util.WebSocketUtils; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class WsExceptionUtil { public static void throwException(String sessionKey, String message, HangUpEnums hangUpType, String roomId){ if(hangUpType == null){ hangUpType = HangUpEnums.OTHER; } + RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.hangup(message,roomId,hangUpType.getCode())); + boolean close = WebSocketUtils.close(sessionKey); + if(!close){ + log.warn("ws连接时发现异常:{},sessionKey {} roomid {}", message, sessionKey, roomId); + } } - - public String handUp(String sessionKey,String message,String roomId,HangUpEnums hang){ - return null; - } } diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java index cfb96dbe..81d8f0c5 100644 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java @@ -49,4 +49,14 @@ public class WsRMsgGen { ok.setMsg("提示!"); return ok; } + + public static WsR hangup(String message, String roomId, Integer hangUpType) { + Map map = new HashMap<>(); + map.put("roomid","roomId"); + map.put("type","hangUpType"); + WsR> ok = WsR.ok(map); + ok.setMethod("hangup"); + ok.setMsg(message); + return ok; + } } diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java index 9f2b014e..73a74fe2 100644 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java @@ -1,6 +1,7 @@ package com.ruoyi.websocket.util; import com.ruoyi.websocket.holder.WebSocketSessionHolder; +import jdk.nashorn.internal.runtime.regexp.JoniRegExp; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -31,6 +32,19 @@ public class WebSocketUtils { sendMessage(session, message); } + public static boolean close(String sessionKey) { + WebSocketSession sessions = WebSocketSessionHolder.getSessions(sessionKey); + if(sessions != null){ + try { + sessions.close(); + return true; + } catch (IOException e) { + log.error("关闭ws失败,sessionKey={}",sessionKey,e); + } + } + return false; + } + public static void sendPongMessage(WebSocketSession session) { sendMessage(session, new PongMessage()); }