通话逻辑

This commit is contained in:
张良(004796)
2023-12-28 22:23:32 +08:00
parent 49922904ec
commit 934a613f11
15 changed files with 245 additions and 122 deletions

View File

@@ -9,7 +9,11 @@ public interface WebSocketConstants {
/**
* websocketSession中的参数的key
*/
String LOGIN_USER_KEY = "loginUser";
String TOKEN = "token";
String ROOM_ID = "roomId";
String USER_ID = "userId";
/**
* 订阅的频道

View File

@@ -0,0 +1,109 @@
package com.ruoyi.websocket.dto;
import com.ruoyi.common.constant.HttpStatus;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 响应信息主体
*
* @author Lion Li
*/
@Data
@NoArgsConstructor
public class WsR<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 成功
*/
public static final int SUCCESS = 200;
/**
* 失败
*/
public static final int FAIL = 500;
private int code;
private String method;
private String msg;
private T data;
public static <T> WsR<T> ok() {
return restResult(null, SUCCESS, "操作成功");
}
public static <T> WsR<T> ok(T data) {
return restResult(data, SUCCESS, "操作成功");
}
public static <T> WsR<T> ok(String msg) {
return restResult(null, SUCCESS, msg);
}
public static <T> WsR<T> ok(String msg, T data) {
return restResult(data, SUCCESS, msg);
}
public static <T> WsR<T> fail() {
return restResult(null, FAIL, "操作失败");
}
public static <T> WsR<T> fail(String msg) {
return restResult(null, FAIL, msg);
}
public static <T> WsR<T> fail(T data) {
return restResult(data, FAIL, "操作失败");
}
public static <T> WsR<T> fail(String msg, T data) {
return restResult(data, FAIL, msg);
}
public static <T> WsR<T> fail(int code, String msg) {
return restResult(null, code, msg);
}
/**
* 返回警告消息
*
* @param msg 返回内容
* @return 警告消息
*/
public static <T> WsR<T> warn(String msg) {
return restResult(null, HttpStatus.WARN, msg);
}
/**
* 返回警告消息
*
* @param msg 返回内容
* @param data 数据对象
* @return 警告消息
*/
public static <T> WsR<T> warn(String msg, T data) {
return restResult(data, HttpStatus.WARN, msg);
}
private static <T> WsR<T> restResult(T data, int code, String msg) {
WsR<T> r = new WsR<>();
r.setCode(code);
r.setData(data);
r.setMsg(msg);
return r;
}
public static <T> Boolean isError(WsR<T> ret) {
return !isSuccess(ret);
}
public static <T> Boolean isSuccess(WsR<T> ret) {
return WsR.SUCCESS == ret.getCode();
}
}

View File

@@ -0,0 +1,52 @@
package com.ruoyi.websocket.dto;
import org.apache.poi.hssf.record.OldCellRecord;
import java.util.HashMap;
import java.util.Map;
public class WsRMsgGen {
public static WsR response(String roomId){
Map<String,Object> map = new HashMap<>();
map.put("roomid",roomId);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("response");
ok.setMsg("连线成功");
return ok;
}
public static WsR startVideo(String roomId,Long duration){
Map<String,Object> map = new HashMap<>();
map.put("roomid",roomId);
map.put("duration",duration);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("startVideo");
ok.setMsg("通话成功!");
return ok;
}
public static WsR sysNotice(String content){
Map<String,Object> map = new HashMap<>();
map.put("type",1);
map.put("content",content);
map.put("linkType",0);
map.put("linkUrl",null);
map.put("fromUid",0);
map.put("toUid",0);
map.put("cancalltime",0);
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("notice");
ok.setMsg("提示!");
return ok;
}
public static WsR updateTip() {
Map<String,Object> map = new HashMap<>();
map.put("tip","等待对方响应...");
WsR<Map<String, Object>> ok = WsR.ok(map);
ok.setMethod("updatetip");
ok.setMsg("提示!");
return ok;
}
}

View File

@@ -1,7 +1,6 @@
package com.ruoyi.websocket.handler;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.websocket.dto.WebSocketMessageDto;
import com.ruoyi.websocket.constant.WebSocketConstants;
import com.ruoyi.websocket.handle.IOpenLogic;
import com.ruoyi.websocket.holder.WebSocketSessionHolder;
import com.ruoyi.websocket.util.WebSocketUtils;
@@ -11,12 +10,8 @@ import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.ruoyi.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
/**
* WebSocketHandler 实现类
*
@@ -50,12 +45,7 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
List<Long> userIds = Collections.singletonList(loginUser.getUserId());
WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();
webSocketMessageDto.setSessionKeys(userIds);
webSocketMessageDto.setMessage(message.getPayload());
WebSocketUtils.publishMessage(webSocketMessageDto);
String token = String.valueOf(session.getAttributes().get(WebSocketConstants.TOKEN));
}
@Override
@@ -95,9 +85,9 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
WebSocketSessionHolder.removeSession(loginUser.getUserId());
log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
String token = String.valueOf(session.getAttributes().get("token"));
WebSocketSessionHolder.removeSession(token);
log.info("[disconnect] sessionId: {},token:{}", session.getId(), token);
}
/**

View File

@@ -1,7 +1,5 @@
package com.ruoyi.websocket.interceptor;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.common.helper.LoginHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
@@ -11,8 +9,6 @@ import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
import static com.ruoyi.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
/**
* WebSocket握手请求的拦截器
*
@@ -33,8 +29,6 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor {
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = LoginHelper.getLoginUser();
attributes.put(LOGIN_USER_KEY, loginUser);
return true;
}

View File

@@ -1,45 +0,0 @@
package com.ruoyi.websocket.listener;
import cn.hutool.core.collection.CollUtil;
import com.ruoyi.websocket.holder.WebSocketSessionHolder;
import com.ruoyi.websocket.util.WebSocketUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
/**
* WebSocket 主题订阅监听器
*
* @author zendwang
*/
@Slf4j
@Component
public class WebSocketTopicListener implements ApplicationRunner, Ordered {
@Override
public void run(ApplicationArguments args) throws Exception {
WebSocketUtils.subscribeMessage((message) -> {
log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getSessionKeys())) {
message.getSessionKeys().forEach(key -> {
if (WebSocketSessionHolder.existSession(key)) {
WebSocketUtils.sendMessage(key, message.getMessage());
}
});
} else {
WebSocketSessionHolder.getSessionsAll().forEach(key -> {
WebSocketUtils.sendMessage(key, message.getMessage());
});
}
});
log.info("初始化WebSocket主题订阅监听器成功");
}
@Override
public int getOrder() {
return -1;
}
}

View File

@@ -0,0 +1,15 @@
package com.ruoyi.websocket.util;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.websocket.dto.WsR;
import java.util.HashMap;
import java.util.Map;
public class RoomWebSocketUtil {
public static void sendSendMessage(String sessionKey,WsR r){
WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r));
}
}

View File

@@ -1,8 +1,5 @@
package com.ruoyi.websocket.util;
import cn.hutool.core.collection.CollUtil;
import com.ruoyi.common.utils.redis.RedisUtils;
import com.ruoyi.websocket.dto.WebSocketMessageDto;
import com.ruoyi.websocket.holder.WebSocketSessionHolder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -13,11 +10,6 @@ import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static com.ruoyi.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
/**
* 工具类
@@ -39,46 +31,6 @@ public class WebSocketUtils {
sendMessage(session, message);
}
/**
* 发布订阅的消息
*
* @param webSocketMessage 消息对象
*/
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
List<Long> unsentSessionKeys = new ArrayList<>();
// 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
continue;
}
unsentSessionKeys.add(sessionKey);
}
// 不在当前服务内session,发布订阅消息
if (CollUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(webSocketMessage.getMessage());
broadcastMessage.setSessionKeys(unsentSessionKeys);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
});
}
}
/**
* 发布订阅的消息(群发)
*
* @param message 消息内容
*/
public static void publishAll(String message) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(message);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
});
}
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}