diff --git a/ruoyi-cai/pom.xml b/ruoyi-cai/pom.xml index 22e3e5d1..56fca0ff 100644 --- a/ruoyi-cai/pom.xml +++ b/ruoyi-cai/pom.xml @@ -29,7 +29,7 @@ com.alibaba fastjson - 2.0.19 + 2.0.32 com.ruoyi diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java index 9363a282..3b0f717c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/bean/RoomData.java @@ -12,4 +12,6 @@ public class RoomData { private Integer status; private BigDecimal videoDivide; private Long hangUpTime; + + private Long beginTime; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java index de32d051..667577d0 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/FdCtxDataCache.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSON; import com.ruoyi.cai.ws.bean.FdCtxData; import com.ruoyi.cai.ws.constant.RedisConstant; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.SearchStrategy; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java index 0f92478f..d5eb95b7 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/cache/RoomCtxCache.java @@ -1,10 +1,12 @@ package com.ruoyi.cai.ws.cache; import com.ruoyi.cai.ws.constant.RedisConstant; +import com.ruoyi.websocket.holder.WebSocketSessionHolder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import java.util.Map; import java.util.concurrent.TimeUnit; @Component @@ -21,4 +23,21 @@ public class RoomCtxCache { redisTemplate.opsForHash().putIfAbsent(key,sessionKey,userType); redisTemplate.expire(key,7, TimeUnit.DAYS); } + + public String getSessionKeyByRoomIdAndUserType(String roomId,Integer userType){ + String key = getKey(roomId); + Map entries = redisTemplate.opsForHash().entries(key); + for (Map.Entry entry : entries.entrySet()) { + String sessionKey = String.valueOf(entry.getKey()); + Integer userTypeK = Integer.valueOf(entry.getValue().toString()); + if(WebSocketSessionHolder.existSession(sessionKey) && userTypeK.equals(userType)){ + return sessionKey; + } + } + return null; + } + + public String getSessionKeyReceiverByRoomId(String roomId){ + return getSessionKeyByRoomIdAndUserType(roomId,UserDataCache.TYPE_RECEIVER); + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java index dc7cc5ff..febee942 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java @@ -11,7 +11,10 @@ import com.ruoyi.cai.ws.constant.RoomStatusEnums; import com.ruoyi.cai.ws.service.CheckConnectionDTO; import com.ruoyi.cai.ws.service.RoomService; import com.ruoyi.cai.ws.util.WsExceptionUtil; +import com.ruoyi.websocket.dto.WsRMsgGen; import com.ruoyi.websocket.handle.IOpenLogic; +import com.ruoyi.websocket.util.RoomWebSocketUtil; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; @@ -42,8 +45,10 @@ public class OpenLogic implements IOpenLogic { public void processOn(WebSocketSession session) { Map map = session.getAttributes(); String token = map.get("token").toString(); + String roomId = map.get("roomId").toString(); + Long userId = Long.valueOf(map.get("userId").toString()); // 校验token - process(token,"",0L); + process(token,roomId,userId); } public void process(String sessionKey,String roomId,Long userId){ @@ -99,7 +104,6 @@ public class OpenLogic implements IOpenLogic { } // 已经接通 if(room.isOnline()){ - // TODO // 如果视频也掉线了,则重连的时候发送消息提示 // sgo(function() use ($rs){ // $users = Agora::getChannelUsers($rs->room->id); @@ -111,12 +115,20 @@ public class OpenLogic implements IOpenLogic { // ->sendToCurrent(WsMsgGen::sysNotice('重连成功,房间已通话 '.gmdate('H:i:s', $rs->callTime()))) // ->sendToReceiver(WsMsgGen::sysNotice('对方已重连成功')); // }); + + Long callTime = roomService.getCallTime(room); + RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.startVideo(room.getRoomId(), callTime)); + RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("重连成功,房间已通话(转换成时分秒) ")); + String sessionKeyReceiver = roomCtxCache.getSessionKeyReceiverByRoomId(room.getRoomId()); + if(StringUtils.isNotEmpty(sessionKeyReceiver)){ + RoomWebSocketUtil.sendSendMessage(sessionKey,WsRMsgGen.sysNotice("对方已重连成功")); + } } if(RoomStatusEnums.STATUS_CREATE.getCode().equals(status) || RoomStatusEnums.STATUS_CALLER_CONNECT.getCode().equals(status) || RoomStatusEnums.STATUS_RECEIVER_CONNECT.getCode().equals(status)){ // 给当前会话发送消息 - 连线成功 - + RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.response(room.getRoomId())); } if(isFirst){ // 给对方发送呼叫页面 @@ -138,9 +150,8 @@ public class OpenLogic implements IOpenLogic { userService.updateVideoStatus(userId,1); anchorService.updateVideoStatus(userId,1); } - // 连线成功 -// $rs->sendToCurrent(WsMsgGen::response($this->roomId)) -// ->sendToCurrent(WsMsgGen::updateTip()); + RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.response(room.getRoomId())); + RoomWebSocketUtil.sendSendMessage(sessionKey, WsRMsgGen.updateTip()); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java index a961f6df..42189e53 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/RoomService.java @@ -1,5 +1,6 @@ package com.ruoyi.cai.ws.service; +import cn.hutool.core.date.DateUtil; import com.ruoyi.cai.ws.bean.Room; import com.ruoyi.cai.ws.bean.RoomData; import com.ruoyi.cai.ws.bean.UserData; @@ -86,4 +87,17 @@ public class RoomService { } return null; } + + public Long getCallTime(Room room) { + RoomData roomData = room.getRoomData(); + Long beginTime = roomData.getBeginTime(); + if(beginTime == null){ + return 0L; + } + if(roomData.getHangUpTime() != null){ + return roomData.getHangUpTime() - roomData.getBeginTime(); + } + return DateUtil.currentSeconds() - beginTime; + + } } diff --git a/ruoyi-websocket/pom.xml b/ruoyi-websocket/pom.xml index ff0f011d..ead8c5cc 100644 --- a/ruoyi-websocket/pom.xml +++ b/ruoyi-websocket/pom.xml @@ -26,6 +26,11 @@ org.springframework.boot spring-boot-starter-websocket + + com.alibaba + fastjson + 2.0.32 + diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java index 23fbc11a..4e2b2699 100644 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java @@ -9,7 +9,11 @@ public interface WebSocketConstants { /** * websocketSession中的参数的key */ - String LOGIN_USER_KEY = "loginUser"; + String TOKEN = "token"; + + String ROOM_ID = "roomId"; + + String USER_ID = "userId"; /** * 订阅的频道 diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsR.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsR.java new file mode 100644 index 00000000..943a4987 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsR.java @@ -0,0 +1,109 @@ +package com.ruoyi.websocket.dto; + +import com.ruoyi.common.constant.HttpStatus; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 响应信息主体 + * + * @author Lion Li + */ +@Data +@NoArgsConstructor +public class WsR implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * 成功 + */ + public static final int SUCCESS = 200; + + /** + * 失败 + */ + public static final int FAIL = 500; + + private int code; + + private String method; + + private String msg; + + private T data; + + public static WsR ok() { + return restResult(null, SUCCESS, "操作成功"); + } + + public static WsR ok(T data) { + return restResult(data, SUCCESS, "操作成功"); + } + + public static WsR ok(String msg) { + return restResult(null, SUCCESS, msg); + } + + public static WsR ok(String msg, T data) { + return restResult(data, SUCCESS, msg); + } + + public static WsR fail() { + return restResult(null, FAIL, "操作失败"); + } + + public static WsR fail(String msg) { + return restResult(null, FAIL, msg); + } + + public static WsR fail(T data) { + return restResult(data, FAIL, "操作失败"); + } + + public static WsR fail(String msg, T data) { + return restResult(data, FAIL, msg); + } + + public static WsR fail(int code, String msg) { + return restResult(null, code, msg); + } + + /** + * 返回警告消息 + * + * @param msg 返回内容 + * @return 警告消息 + */ + public static WsR warn(String msg) { + return restResult(null, HttpStatus.WARN, msg); + } + + /** + * 返回警告消息 + * + * @param msg 返回内容 + * @param data 数据对象 + * @return 警告消息 + */ + public static WsR warn(String msg, T data) { + return restResult(data, HttpStatus.WARN, msg); + } + + private static WsR restResult(T data, int code, String msg) { + WsR r = new WsR<>(); + r.setCode(code); + r.setData(data); + r.setMsg(msg); + return r; + } + + public static Boolean isError(WsR ret) { + return !isSuccess(ret); + } + + public static Boolean isSuccess(WsR ret) { + return WsR.SUCCESS == ret.getCode(); + } +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java new file mode 100644 index 00000000..cfb96dbe --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WsRMsgGen.java @@ -0,0 +1,52 @@ +package com.ruoyi.websocket.dto; + +import org.apache.poi.hssf.record.OldCellRecord; + +import java.util.HashMap; +import java.util.Map; + +public class WsRMsgGen { + + public static WsR response(String roomId){ + Map map = new HashMap<>(); + map.put("roomid",roomId); + WsR> ok = WsR.ok(map); + ok.setMethod("response"); + ok.setMsg("连线成功"); + return ok; + } + + public static WsR startVideo(String roomId,Long duration){ + Map map = new HashMap<>(); + map.put("roomid",roomId); + map.put("duration",duration); + WsR> ok = WsR.ok(map); + ok.setMethod("startVideo"); + ok.setMsg("通话成功!"); + return ok; + } + + public static WsR sysNotice(String content){ + Map map = new HashMap<>(); + map.put("type",1); + map.put("content",content); + map.put("linkType",0); + map.put("linkUrl",null); + map.put("fromUid",0); + map.put("toUid",0); + map.put("cancalltime",0); + WsR> ok = WsR.ok(map); + ok.setMethod("notice"); + ok.setMsg("提示!"); + return ok; + } + + public static WsR updateTip() { + Map map = new HashMap<>(); + map.put("tip","等待对方响应..."); + WsR> ok = WsR.ok(map); + ok.setMethod("updatetip"); + ok.setMsg("提示!"); + return ok; + } +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java index 6971b2fd..00a7369c 100644 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java @@ -1,7 +1,6 @@ package com.ruoyi.websocket.handler; -import com.ruoyi.common.core.domain.model.LoginUser; -import com.ruoyi.websocket.dto.WebSocketMessageDto; +import com.ruoyi.websocket.constant.WebSocketConstants; import com.ruoyi.websocket.handle.IOpenLogic; import com.ruoyi.websocket.holder.WebSocketSessionHolder; import com.ruoyi.websocket.util.WebSocketUtils; @@ -11,12 +10,8 @@ import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.AbstractWebSocketHandler; -import java.util.Collections; -import java.util.List; import java.util.Map; -import static com.ruoyi.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; - /** * WebSocketHandler 实现类 * @@ -50,12 +45,7 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - List userIds = Collections.singletonList(loginUser.getUserId()); - WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); - webSocketMessageDto.setSessionKeys(userIds); - webSocketMessageDto.setMessage(message.getPayload()); - WebSocketUtils.publishMessage(webSocketMessageDto); + String token = String.valueOf(session.getAttributes().get(WebSocketConstants.TOKEN)); } @Override @@ -95,9 +85,9 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { - LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); - WebSocketSessionHolder.removeSession(loginUser.getUserId()); - log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); + String token = String.valueOf(session.getAttributes().get("token")); + WebSocketSessionHolder.removeSession(token); + log.info("[disconnect] sessionId: {},token:{}", session.getId(), token); } /** diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java index 435024d6..b087d610 100644 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java @@ -1,7 +1,5 @@ package com.ruoyi.websocket.interceptor; -import com.ruoyi.common.core.domain.model.LoginUser; -import com.ruoyi.common.helper.LoginHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; @@ -11,8 +9,6 @@ import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; -import static com.ruoyi.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; - /** * WebSocket握手请求的拦截器 * @@ -33,8 +29,6 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor { */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { - LoginUser loginUser = LoginHelper.getLoginUser(); - attributes.put(LOGIN_USER_KEY, loginUser); return true; } diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/listener/WebSocketTopicListener.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/listener/WebSocketTopicListener.java deleted file mode 100644 index 040551d1..00000000 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/listener/WebSocketTopicListener.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.ruoyi.websocket.listener; - -import cn.hutool.core.collection.CollUtil; -import com.ruoyi.websocket.holder.WebSocketSessionHolder; -import com.ruoyi.websocket.util.WebSocketUtils; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.core.Ordered; -import org.springframework.stereotype.Component; - -/** - * WebSocket 主题订阅监听器 - * - * @author zendwang - */ -@Slf4j -@Component -public class WebSocketTopicListener implements ApplicationRunner, Ordered { - - @Override - public void run(ApplicationArguments args) throws Exception { - WebSocketUtils.subscribeMessage((message) -> { - log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); - // 如果key不为空就按照key发消息 如果为空就群发 - if (CollUtil.isNotEmpty(message.getSessionKeys())) { - message.getSessionKeys().forEach(key -> { - if (WebSocketSessionHolder.existSession(key)) { - WebSocketUtils.sendMessage(key, message.getMessage()); - } - }); - } else { - WebSocketSessionHolder.getSessionsAll().forEach(key -> { - WebSocketUtils.sendMessage(key, message.getMessage()); - }); - } - }); - log.info("初始化WebSocket主题订阅监听器成功"); - } - - @Override - public int getOrder() { - return -1; - } -} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/RoomWebSocketUtil.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/RoomWebSocketUtil.java new file mode 100644 index 00000000..7ba1b735 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/RoomWebSocketUtil.java @@ -0,0 +1,15 @@ +package com.ruoyi.websocket.util; + +import com.alibaba.fastjson2.JSON; +import com.ruoyi.common.core.domain.R; +import com.ruoyi.websocket.dto.WsR; + +import java.util.HashMap; +import java.util.Map; + +public class RoomWebSocketUtil { + public static void sendSendMessage(String sessionKey,WsR r){ + WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r)); + } + +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java index 282c76f7..9f2b014e 100644 --- a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java @@ -1,8 +1,5 @@ package com.ruoyi.websocket.util; -import cn.hutool.core.collection.CollUtil; -import com.ruoyi.common.utils.redis.RedisUtils; -import com.ruoyi.websocket.dto.WebSocketMessageDto; import com.ruoyi.websocket.holder.WebSocketSessionHolder; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -13,11 +10,6 @@ import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -import static com.ruoyi.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; /** * 工具类 @@ -39,46 +31,6 @@ public class WebSocketUtils { sendMessage(session, message); } - /** - * 发布订阅的消息 - * - * @param webSocketMessage 消息对象 - */ - public static void publishMessage(WebSocketMessageDto webSocketMessage) { - List unsentSessionKeys = new ArrayList<>(); - // 当前服务内session,直接发送消息 - for (Long sessionKey : webSocketMessage.getSessionKeys()) { - if (WebSocketSessionHolder.existSession(sessionKey)) { - WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); - continue; - } - unsentSessionKeys.add(sessionKey); - } - // 不在当前服务内session,发布订阅消息 - if (CollUtil.isNotEmpty(unsentSessionKeys)) { - WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); - broadcastMessage.setMessage(webSocketMessage.getMessage()); - broadcastMessage.setSessionKeys(unsentSessionKeys); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", - WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); - }); - } - } - - /** - * 发布订阅的消息(群发) - * - * @param message 消息内容 - */ - public static void publishAll(String message) { - WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); - broadcastMessage.setMessage(message); - RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { - log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); - }); - } - public static void sendPongMessage(WebSocketSession session) { sendMessage(session, new PongMessage()); }