diff --git a/pom.xml b/pom.xml index db35adbb..93e893f9 100644 --- a/pom.xml +++ b/pom.xml @@ -331,6 +331,7 @@ ruoyi-oss ruoyi-sms ruoyi-cai + ruoyi-websocket pom diff --git a/ruoyi-websocket/pom.xml b/ruoyi-websocket/pom.xml new file mode 100644 index 00000000..ff0f011d --- /dev/null +++ b/ruoyi-websocket/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + com.ruoyi + ruoyi-vue-plus + 4.8.2 + + + ruoyi-websocket + + + 8 + 8 + UTF-8 + + + + + com.ruoyi + ruoyi-common + + + org.springframework.boot + spring-boot-starter-websocket + + + + diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/config/WebSocketConfig.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/config/WebSocketConfig.java new file mode 100644 index 00000000..a11520b7 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/config/WebSocketConfig.java @@ -0,0 +1,59 @@ +package com.ruoyi.websocket.config; + +import cn.hutool.core.util.StrUtil; +import com.ruoyi.websocket.handler.PlusWebSocketHandler; +import com.ruoyi.websocket.interceptor.PlusWebSocketInterceptor; +import com.ruoyi.websocket.listener.WebSocketTopicListener; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.server.HandshakeInterceptor; + +/** + * WebSocket 配置 + * + * @author zendwang + */ +@AutoConfiguration +@ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") +@EnableConfigurationProperties(WebSocketProperties.class) +@EnableWebSocket +public class WebSocketConfig { + + @Bean + public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, + WebSocketHandler webSocketHandler, + WebSocketProperties webSocketProperties) { + if (StrUtil.isBlank(webSocketProperties.getPath())) { + webSocketProperties.setPath("/websocket"); + } + + if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { + webSocketProperties.setAllowedOrigins("*"); + } + + return registry -> registry + .addHandler(webSocketHandler, webSocketProperties.getPath()) + .addInterceptors(handshakeInterceptor) + .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); + } + + @Bean + public HandshakeInterceptor handshakeInterceptor() { + return new PlusWebSocketInterceptor(); + } + + @Bean + public WebSocketHandler webSocketHandler() { + return new PlusWebSocketHandler(); + } + + @Bean + public WebSocketTopicListener topicListener() { + return new WebSocketTopicListener(); + } +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/config/WebSocketProperties.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/config/WebSocketProperties.java new file mode 100644 index 00000000..8112a1c2 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/config/WebSocketProperties.java @@ -0,0 +1,26 @@ +package com.ruoyi.websocket.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * WebSocket 配置项 + * + * @author zendwang + */ +@ConfigurationProperties("websocket") +@Data +public class WebSocketProperties { + + private Boolean enabled; + + /** + * 路径 + */ + private String path; + + /** + * 设置访问源地址 + */ + private String allowedOrigins; +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java new file mode 100644 index 00000000..23fbc11a --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/constant/WebSocketConstants.java @@ -0,0 +1,28 @@ +package com.ruoyi.websocket.constant; + +/** + * websocket的常量配置 + * + * @author zendwang + */ +public interface WebSocketConstants { + /** + * websocketSession中的参数的key + */ + String LOGIN_USER_KEY = "loginUser"; + + /** + * 订阅的频道 + */ + String WEB_SOCKET_TOPIC = "global:websocket"; + + /** + * 前端心跳检查的命令 + */ + String PING = "ping"; + + /** + * 服务端心跳恢复的字符串 + */ + String PONG = "pong"; +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WebSocketMessageDto.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WebSocketMessageDto.java new file mode 100644 index 00000000..488a49b4 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/dto/WebSocketMessageDto.java @@ -0,0 +1,27 @@ +package com.ruoyi.websocket.dto; + +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +/** + * 消息的dto + * + * @author zendwang + */ +@Data +public class WebSocketMessageDto implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 需要推送到的session key 列表 + */ + private List sessionKeys; + + /** + * 需要发送的消息 + */ + private String message; +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java new file mode 100644 index 00000000..f2a8408a --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/handler/PlusWebSocketHandler.java @@ -0,0 +1,103 @@ +package com.ruoyi.websocket.handler; + +import com.ruoyi.common.core.domain.model.LoginUser; +import com.ruoyi.websocket.dto.WebSocketMessageDto; +import com.ruoyi.websocket.holder.WebSocketSessionHolder; +import com.ruoyi.websocket.util.WebSocketUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.*; +import org.springframework.web.socket.handler.AbstractWebSocketHandler; + +import java.util.Collections; +import java.util.List; + +import static com.ruoyi.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; + +/** + * WebSocketHandler 实现类 + * + * @author zendwang + */ +@Slf4j +public class PlusWebSocketHandler extends AbstractWebSocketHandler { + + /** + * 连接成功后 + */ + @Override + public void afterConnectionEstablished(WebSocketSession session) { + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + WebSocketSessionHolder.addSession(loginUser.getUserId(), session); + log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); + } + + /** + * 处理发送来的文本消息 + * + * @param session + * @param message + * @throws Exception + */ + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + List userIds = Collections.singletonList(loginUser.getUserId()); + WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); + webSocketMessageDto.setSessionKeys(userIds); + webSocketMessageDto.setMessage(message.getPayload()); + WebSocketUtils.publishMessage(webSocketMessageDto); + } + + @Override + protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { + super.handleBinaryMessage(session, message); + } + + /** + * 心跳监测的回复 + * + * @param session + * @param message + * @throws Exception + */ + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { + WebSocketUtils.sendPongMessage(session); + } + + /** + * 连接出错时 + * + * @param session + * @param exception + * @throws Exception + */ + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { + log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); + } + + /** + * 连接关闭后 + * + * @param session + * @param status + */ + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); + WebSocketSessionHolder.removeSession(loginUser.getUserId()); + log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); + } + + /** + * 是否支持分片消息 + * + * @return + */ + @Override + public boolean supportsPartialMessages() { + return false; + } + +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/holder/WebSocketSessionHolder.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/holder/WebSocketSessionHolder.java new file mode 100644 index 00000000..4c9d9fe3 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/holder/WebSocketSessionHolder.java @@ -0,0 +1,42 @@ +package com.ruoyi.websocket.holder; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.springframework.web.socket.WebSocketSession; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * WebSocketSession 用于保存当前所有在线的会话信息 + * + * @author zendwang + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class WebSocketSessionHolder { + + private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>(); + + public static void addSession(Long sessionKey, WebSocketSession session) { + USER_SESSION_MAP.put(sessionKey, session); + } + + public static void removeSession(Long sessionKey) { + if (USER_SESSION_MAP.containsKey(sessionKey)) { + USER_SESSION_MAP.remove(sessionKey); + } + } + + public static WebSocketSession getSessions(Long sessionKey) { + return USER_SESSION_MAP.get(sessionKey); + } + + public static Set getSessionsAll() { + return USER_SESSION_MAP.keySet(); + } + + public static Boolean existSession(Long sessionKey) { + return USER_SESSION_MAP.containsKey(sessionKey); + } +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java new file mode 100644 index 00000000..e7074765 --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/interceptor/PlusWebSocketInterceptor.java @@ -0,0 +1,51 @@ +package com.ruoyi.websocket.interceptor; + +import com.ruoyi.common.core.domain.model.LoginUser; +import com.ruoyi.common.helper.LoginHelper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; + +import static com.ruoyi.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; + +/** + * WebSocket握手请求的拦截器 + * + * @author zendwang + */ +@Slf4j +public class PlusWebSocketInterceptor implements HandshakeInterceptor { + + /** + * 握手前 + * + * @param request request + * @param response response + * @param wsHandler wsHandler + * @param attributes attributes + * @return 是否握手成功 + */ + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { + LoginUser loginUser = LoginHelper.getLoginUser(); + attributes.put(LOGIN_USER_KEY, loginUser); + return true; + } + + /** + * 握手后 + * + * @param request request + * @param response response + * @param wsHandler wsHandler + * @param exception 异常 + */ + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { + + } +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/listener/WebSocketTopicListener.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/listener/WebSocketTopicListener.java new file mode 100644 index 00000000..d5f7e41d --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/listener/WebSocketTopicListener.java @@ -0,0 +1,43 @@ +package com.ruoyi.websocket.listener; + +import cn.hutool.core.collection.CollUtil; +import com.ruoyi.websocket.holder.WebSocketSessionHolder; +import com.ruoyi.websocket.util.WebSocketUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.Ordered; + +/** + * WebSocket 主题订阅监听器 + * + * @author zendwang + */ +@Slf4j +public class WebSocketTopicListener implements ApplicationRunner, Ordered { + + @Override + public void run(ApplicationArguments args) throws Exception { + WebSocketUtils.subscribeMessage((message) -> { + log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); + // 如果key不为空就按照key发消息 如果为空就群发 + if (CollUtil.isNotEmpty(message.getSessionKeys())) { + message.getSessionKeys().forEach(key -> { + if (WebSocketSessionHolder.existSession(key)) { + WebSocketUtils.sendMessage(key, message.getMessage()); + } + }); + } else { + WebSocketSessionHolder.getSessionsAll().forEach(key -> { + WebSocketUtils.sendMessage(key, message.getMessage()); + }); + } + }); + log.info("初始化WebSocket主题订阅监听器成功"); + } + + @Override + public int getOrder() { + return -1; + } +} diff --git a/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java new file mode 100644 index 00000000..471d65bf --- /dev/null +++ b/ruoyi-websocket/src/main/java/com/ruoyi/websocket/util/WebSocketUtils.java @@ -0,0 +1,110 @@ +package com.ruoyi.websocket.util; + +import cn.hutool.core.collection.CollUtil; +import com.ruoyi.common.utils.redis.RedisUtils; +import com.ruoyi.websocket.dto.WebSocketMessageDto; +import com.ruoyi.websocket.holder.WebSocketSessionHolder; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import static com.ruoyi.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; + +/** + * 工具类 + * + * @author zendwang + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class WebSocketUtils { + + /** + * 发送消息 + * + * @param sessionKey session主键 一般为用户id + * @param message 消息文本 + */ + public static void sendMessage(Long sessionKey, String message) { + WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); + sendMessage(session, message); + } + + /** + * 订阅消息 + * + * @param consumer 自定义处理 + */ + public static void subscribeMessage(Consumer consumer) { + RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); + } + + /** + * 发布订阅的消息 + * + * @param webSocketMessage 消息对象 + */ + public static void publishMessage(WebSocketMessageDto webSocketMessage) { + List unsentSessionKeys = new ArrayList<>(); + // 当前服务内session,直接发送消息 + for (Long sessionKey : webSocketMessage.getSessionKeys()) { + if (WebSocketSessionHolder.existSession(sessionKey)) { + WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); + continue; + } + unsentSessionKeys.add(sessionKey); + } + // 不在当前服务内session,发布订阅消息 + if (CollUtil.isNotEmpty(unsentSessionKeys)) { + WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); + broadcastMessage.setMessage(webSocketMessage.getMessage()); + broadcastMessage.setSessionKeys(unsentSessionKeys); + RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { + log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}", + WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); + }); + } + } + + /** + * 发布订阅的消息(群发) + * + * @param message 消息内容 + */ + public static void publishAll(String message) { + WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); + broadcastMessage.setMessage(message); + RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { + log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); + }); + } + + public static void sendPongMessage(WebSocketSession session) { + sendMessage(session, new PongMessage()); + } + + public static void sendMessage(WebSocketSession session, String message) { + sendMessage(session, new TextMessage(message)); + } + + private static void sendMessage(WebSocketSession session, WebSocketMessage message) { + if (session == null || !session.isOpen()) { + log.warn("[send] session会话已经关闭"); + } else { + try { + session.sendMessage(message); + } catch (IOException e) { + log.error("[send] session({}) 发送消息({}) 异常", session, message, e); + } + } + } +}