From 447ad8923304b150d2ab77d6e6410442dd754435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E8=89=AF=28004796=29?= Date: Tue, 16 Apr 2024 19:23:14 +0800 Subject: [PATCH] 123 --- .../com/ruoyi/find/SendMessageHttpImpl.java | 12 +++ ruoyi-cai/pom.xml | 6 ++ .../java/com/ruoyi/cai/mq/AmqpWsProducer.java | 1 + .../main/java/com/ruoyi/cai/trd/Agora.java | 10 +- .../cai/ws/handler/RoomWebSocketHandler.java | 15 ++- .../cai/ws/holder/WebSocketSessionHolder.java | 26 ++++- .../interceptor/PlusWebSocketInterceptor.java | 1 + .../cai/ws/manager/WebSocketManager.java | 21 ++-- .../com/ruoyi/cai/ws/processon/OpenLogic.java | 2 +- .../ruoyi/cai/ws/service/SettleService.java | 25 +++-- .../ruoyi/cai/ws/util/RoomWebSocketUtil.java | 25 ++++- .../com/ruoyi/cai/ws/util/SendMessageI.java | 6 ++ .../com/ruoyi/cai/ws/util/SessionObj.java | 9 ++ .../com/ruoyi/cai/ws/util/WebSocketUtils.java | 4 +- .../com/ruoyi/consumer/RedisConsumer.java | 57 ++++++++++ .../consumer/RoomCheckDelayMqConsumer.java | 21 ++-- .../consumer/RoomSettleDelayMqConsumer.java | 3 +- .../com/ruoyi/consumer/SendMessageWsImpl.java | 27 +++++ .../main/java/com/ruoyi/job/RoomCheckJob.java | 3 +- .../java/com/ruoyi/job/RoomCheckJobBack.java | 73 +++++++++++++ .../java/com/ruoyi/job/RoomKouQianJobBak.java | 101 ++++++++++++++++++ .../{job => service}/RoomCheckJobService.java | 10 +- .../src/main/resources/application-dev.yml | 20 ++-- 23 files changed, 422 insertions(+), 56 deletions(-) create mode 100644 ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java create mode 100644 ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java create mode 100644 ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java create mode 100644 ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java create mode 100644 ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java rename ruoyi-websocket-boot/src/main/java/com/ruoyi/{job => service}/RoomCheckJobService.java (96%) diff --git a/ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java b/ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java new file mode 100644 index 00000000..7b28782c --- /dev/null +++ b/ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java @@ -0,0 +1,12 @@ +package com.ruoyi.find; + +import com.ruoyi.cai.ws.util.SendMessageI; +import org.springframework.stereotype.Component; + +@Component +public class SendMessageHttpImpl implements SendMessageI { + + @Override + public void sendMessage(String sessionKey, String message){ + } +} diff --git a/ruoyi-cai/pom.xml b/ruoyi-cai/pom.xml index e559ba56..55fcc9c6 100644 --- a/ruoyi-cai/pom.xml +++ b/ruoyi-cai/pom.xml @@ -30,6 +30,12 @@ org.springframework.boot spring-boot-starter-websocket + + + org.springframework.boot + spring-boot-starter-tomcat + + com.ruoyi diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java index 6600822e..16ac555d 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java @@ -21,6 +21,7 @@ public class AmqpWsProducer { }); } + @Deprecated public void sendRoomSettleDelay(String message, Integer timeout){ rabbitTemplate.convertAndSend(RoomSettleDelayWsMqConstant.EXCHANGE_NAME, RoomSettleDelayWsMqConstant.ROUTING_KEY, diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java index 5ec2587f..f256c4d3 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java @@ -42,8 +42,14 @@ public class Agora { if(jsonobject == null){ return Collections.emptyList(); } - JSONArray jsonArray = jsonobject.getJSONObject("data").getJSONArray("broadcasters"); - return jsonArray.toJavaList(String.class); + JSONObject data = jsonobject.getJSONObject("data"); + if(data != null){ + JSONArray jsonArray = data.getJSONArray("broadcasters"); + if(jsonArray != null){ + return jsonArray.toJavaList(String.class); + } + } + return new ArrayList<>(); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java index 5bca70a4..0362afdc 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java @@ -119,6 +119,13 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); + try{ + if(session.isOpen()){ + session.close(); + } + }catch (Exception e){ + log.error(e.getMessage()); + } } /** @@ -133,7 +140,13 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler { // TODO 连接关闭 是否要删除fd的关系 WebSocketSessionHolder.removeSession(session.getId()); log.info("[disconnect] sessionId: {},token:{}", session.getId(), token); - session.close(); + try{ + if(session.isOpen()){ + session.close(); + } + }catch (Exception e){ + log.error(e.getMessage()); + } } /** diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java index ff74dd2c..193e7c93 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java @@ -4,6 +4,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.springframework.web.socket.WebSocketSession; +import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -17,19 +18,36 @@ import java.util.concurrent.ConcurrentHashMap; public class WebSocketSessionHolder { private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>(); + private static final Map USER_SESSION_ID_MAP = new ConcurrentHashMap<>(); public static int size(){ return USER_SESSION_MAP.size(); } - public static void addSession(String sessionKey, WebSocketSession session) { + public static void addSession(String sessionKey, WebSocketSession session,Long userId) { USER_SESSION_MAP.put(sessionKey, session); + if(USER_SESSION_ID_MAP.containsKey(userId)){ // T人动作 + String sessionKeyOld = USER_SESSION_ID_MAP.get(userId); + if(sessionKeyOld != null){ + WebSocketSession webSocketSession = USER_SESSION_MAP.get(sessionKeyOld); + if(webSocketSession != null){ + if(webSocketSession.isOpen()){ + try { + webSocketSession.close(); + } catch (IOException e) { + // ignore + } + } + removeSession(sessionKeyOld); + } + + } + } + USER_SESSION_ID_MAP.put(userId,sessionKey); } public static void removeSession(String sessionKey) { - if (USER_SESSION_MAP.containsKey(sessionKey)) { - USER_SESSION_MAP.remove(sessionKey); - } + USER_SESSION_MAP.remove(sessionKey); } public static WebSocketSession getSessions(String sessionKey) { diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java index 814089cf..d76d9211 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java @@ -29,6 +29,7 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor { */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { + return true; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java index 467f9b92..a01ec90e 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java @@ -21,6 +21,8 @@ import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + @Component public class WebSocketManager { @Autowired @@ -64,14 +66,21 @@ public class WebSocketManager { return room.getRoomId(); } RLock lock = redissonClient.getLock(String.format(RedisConstant.INIT_ROOM_LOCK, userCall.getFromUid(), userCall.getToUid())); - boolean b = lock.tryLock(); - if(!b){ - throw new ServiceException("系统繁忙"); - } + boolean b; try { + b = lock.tryLock(1, TimeUnit.MINUTES); + if(!b){ + throw new ServiceException("系统繁忙"); + } roomId = initRoom(userCall); - }finally { - lock.unlock(); + } catch (InterruptedException e) { + throw new ServiceException("系统繁忙"); + } finally { + try { + lock.unlock(); + }catch (Exception e){ + // ignore + } } return roomId; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java index c3eb40f0..fa64326f 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java @@ -71,7 +71,7 @@ public class OpenLogic { WsExceptionUtil.throwExceptionFast(session,"无效token"); return; } - WebSocketSessionHolder.addSession(session.getId(), session); + WebSocketSessionHolder.addSession(session.getId(), session, wsToken.getUserId()); Long roomId = wsToken.getRoomId(); Long userId = wsToken.getUserId(); Room room = roomService.load(roomId); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java index f460d044..baaa5e78 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java @@ -27,6 +27,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; @Component @Slf4j @@ -99,7 +100,7 @@ public class SettleService { /** * 结算处理 */ - public SettleResp processOn(Long roomId){ + public SettleResp processOn(Long roomId) { Room room = roomService.load(roomId); if(room == null){ log.warn("房间不存在,无需结算 roomId={}",roomId); @@ -112,17 +113,23 @@ public class SettleService { log.info("正在结算中,稍等!"); return SettleResp.builder().nextRun(true).build(); } - boolean lockFlag = clientLock.tryLock(); - if(!lockFlag){ - log.info("正在结算中,稍等!"); - return SettleResp.builder().nextRun(true).build(); - } + boolean lockFlag = false; try { + lockFlag = clientLock.tryLock(5,60, TimeUnit.SECONDS); + if(!lockFlag){ + log.info("正在结算中,稍等!"); + return SettleResp.builder().nextRun(true).build(); + } return deal(room); - }finally { - clientLock.unlock(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + try { + clientLock.unlock(); + }catch (Exception e){ + // ignore + } } - } @Autowired diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java index eddecdf2..72b0e119 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java @@ -2,22 +2,41 @@ package com.ruoyi.cai.ws.util; import com.alibaba.fastjson2.JSON; import com.ruoyi.cai.ws.dto.WsR; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; +import javax.annotation.PostConstruct; import java.util.List; +@Component public class RoomWebSocketUtil { + + @Autowired + private SendMessageI sendMessage; + + private static SendMessageI sendMessageStatic; + + @PostConstruct + public void init(){ + sendMessageStatic = sendMessage; + } + + public static void sendSendMessage(String sessionKey, WsR r){ - WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r)); + sendMessageStatic.sendMessage(sessionKey, JSON.toJSONString(r)); +// WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r)); } public static void sendSendMessage(WebSocketSession sessionKey, WsR r){ - WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r)); + sendMessageStatic.sendMessage(sessionKey.getId(), JSON.toJSONString(r)); +// WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r)); } public static void sendSendMessage(List sessionKey, WsR r){ for (String s : sessionKey) { - WebSocketUtils.sendMessage(s, JSON.toJSONString(r)); + sendMessageStatic.sendMessage(s, JSON.toJSONString(r)); +// WebSocketUtils.sendMessage(s, JSON.toJSONString(r)); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java new file mode 100644 index 00000000..357df91e --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java @@ -0,0 +1,6 @@ +package com.ruoyi.cai.ws.util; + + +public interface SendMessageI { + void sendMessage(String sessionKey, String message); +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java new file mode 100644 index 00000000..11a7cd25 --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java @@ -0,0 +1,9 @@ +package com.ruoyi.cai.ws.util; + +import lombok.Data; + +@Data +public class SessionObj { + private String sessionKey; + private String data; +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java index 78f7c1b7..9665e219 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java @@ -45,7 +45,9 @@ public class WebSocketUtils { public static boolean close(WebSocketSession session) { if(session != null){ try { - session.close(); + if(session.isOpen()){ + session.close(); + } return true; } catch (IOException e) { log.error("关闭ws失败,session={}",session,e); diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java new file mode 100644 index 00000000..9eec3b50 --- /dev/null +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java @@ -0,0 +1,57 @@ +package com.ruoyi.consumer; + +import com.ruoyi.cai.ws.util.SessionObj; +import com.ruoyi.cai.ws.util.WebSocketUtils; +import org.redisson.api.RBlockingQueue; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@Component +public class RedisConsumer { + @Autowired + private RedissonClient redissonClient; + + + @PostConstruct + public void init(){ + ExecutorService executorService = Executors.newFixedThreadPool(20); + for (int i = 0; i < 20; i++) { + executorService.execute(() -> { + while (true){ + try { + System.out.println(); + run(); + System.out.println(); + }catch (Exception e){ + + } + } + }); + } + } + + private void run() throws InterruptedException { + RBlockingQueue sessionPush = redissonClient.getBlockingQueue("sessionPush"); + SessionObj poll = sessionPush.take(); + RLock lock = redissonClient.getLock("sessionPushLock:" + poll.getSessionKey()); + try { + lock.tryLock(5,5, TimeUnit.SECONDS); + WebSocketUtils.sendMessage(poll.getSessionKey(), poll.getData()); + } catch (InterruptedException e) { + // + } finally { + try { + lock.unlock(); + }catch (Exception e){ + // + } + } + } +} diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java index 4e26ebae..ca3259b8 100644 --- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java @@ -3,7 +3,7 @@ package com.ruoyi.consumer; import com.ruoyi.cai.mq.AmqpWsProducer; import com.ruoyi.cai.mq.constant.ws.RoomCheckDelayWsMqConstant; import com.ruoyi.cai.ws.job.JobResp; -import com.ruoyi.job.RoomCheckJobService; +import com.ruoyi.service.RoomCheckJobService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; @@ -17,22 +17,21 @@ public class RoomCheckDelayMqConsumer { @Autowired private RoomCheckJobService roomCheckJobService; - @RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME - ,containerFactory = "customContainerFactory") + @RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME, concurrency = "30") public void sendRoomCheck(String roomIdStr) { - log.info("开始执行房间检测: roomId={}",roomIdStr); + log.info("开始执行房间检测: roomId={}", roomIdStr); try { Long roomId = Long.valueOf(roomIdStr); JobResp resp = roomCheckJobService.checkRoom(roomId); - if(resp.isNextCreateJob()){ + if (resp.isNextCreateJob()) { // 1分钟后继续执行 - log.info("1分钟后继续执行房间检测: roomId={}",roomIdStr); - amqpWsProducer.sendRoomCheckDelay(roomIdStr,60); + log.info("1分钟后继续执行房间检测: roomId={}", roomIdStr); + amqpWsProducer.sendRoomCheckDelay(roomIdStr, 60); } - }catch (Exception e){ - log.error("每分钟定时检测房间失败! roomId={}",roomIdStr); - }finally { - log.info("结束执行房间检测: roomId={}",roomIdStr); + } catch (Exception e) { + log.error("每分钟定时检测房间失败! roomId={}", roomIdStr); + } finally { + log.info("结束执行房间检测: roomId={}", roomIdStr); } } diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java index d5d37637..f4da2086 100644 --- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java @@ -16,8 +16,7 @@ public class RoomSettleDelayMqConsumer { @Autowired private AmqpWsProducer amqpWsProducer; - @RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME - ,containerFactory = "customContainerFactory") + @RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME,concurrency = "30") public void checkTimeOutMq(String message) { log.info("开始执行预扣费: " + message); try { diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java new file mode 100644 index 00000000..11c869a1 --- /dev/null +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java @@ -0,0 +1,27 @@ +package com.ruoyi.consumer; + +import com.ruoyi.cai.ws.util.SendMessageI; +import com.ruoyi.cai.ws.util.SessionObj; +import org.redisson.api.RBlockingQueue; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class SendMessageWsImpl implements SendMessageI { + @Autowired + private RedissonClient redissonClient; + + @Override + public void sendMessage(String sessionKey, String message){ + RBlockingQueue sessionPush = redissonClient.getBlockingQueue("sessionPush"); + SessionObj sessionObj = new SessionObj(); + sessionObj.setSessionKey(sessionKey); + sessionObj.setData(message); + try { + sessionPush.put(sessionObj); + } catch (InterruptedException e) { + // + } + } +} diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java index 0777cd41..7ad4c061 100644 --- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java @@ -1,6 +1,7 @@ package com.ruoyi.job; import com.ruoyi.cai.ws.cache.OnlineDataCache; +import com.ruoyi.service.RoomCheckJobService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -20,7 +21,7 @@ public class RoomCheckJob { /** * 每 7分钟执行一次 */ - @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES) + @Scheduled(fixedDelay = 5,timeUnit = TimeUnit.MINUTES) public void run(){ Set all = onlineDataCache.getAll(); for (String roomIdS : all) { diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java new file mode 100644 index 00000000..5fa2aa11 --- /dev/null +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java @@ -0,0 +1,73 @@ +/* +package com.ruoyi.job; + +import com.alibaba.ttl.threadpool.TtlExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.ruoyi.cai.ws.cache.OnlineDataCache; +import com.ruoyi.service.RoomCheckJobService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Set; +import java.util.concurrent.*; + +@Component +@Slf4j +public class RoomCheckJobBack { + @Autowired + private OnlineDataCache onlineDataCache; + @Autowired + private RoomCheckJobService roomCheckJobService; + + private final static int CPU_NUM = 100; + + public static Executor CHECK_ROOM_EXECUTOR; + + static { + // 丢弃队列 + ThreadPoolExecutor syncExecutor = new ThreadPoolExecutor(CPU_NUM, + CPU_NUM * 2, + 5, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + init("roomCheckJob-%d"), + new ThreadPoolExecutor.DiscardPolicy()); + CHECK_ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(syncExecutor); + + } + + private static ThreadFactory init(String nameFormat){ + return new ThreadFactoryBuilder().setNameFormat(nameFormat).build(); + } + + private static ThreadPoolExecutor initExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit timeUnit, + BlockingQueue workQueue, String nameFormat){ + return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, + init(nameFormat)); + } + */ +/** + * 每 1分钟执行一次 + *//* + + @Scheduled(fixedDelay = 3,timeUnit = TimeUnit.MINUTES) +// @PostConstruct + public void run(){ + while (true) { + Set all = onlineDataCache.getAll(); + for (String roomIdS : all) { + try { + Long roomId = Long.valueOf(roomIdS); + CHECK_ROOM_EXECUTOR.execute(() -> { + roomCheckJobService.checkRoom(roomId); + }); + }catch (Exception e){ + log.info("房间检测失败!需要开发排查问题 roomId={}", roomIdS, e); + } + } + } + } +} +*/ diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java new file mode 100644 index 00000000..45c4c5d9 --- /dev/null +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java @@ -0,0 +1,101 @@ +/* +package com.ruoyi.job; + +import cn.hutool.core.util.RandomUtil; +import com.alibaba.ttl.threadpool.TtlExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.ruoyi.cai.ws.cache.OnlineDataCache; +import com.ruoyi.cai.ws.constant.RedisConstant; +import com.ruoyi.cai.ws.service.SettleService; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RBucket; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.*; + +@Component +@Slf4j +public class RoomKouQianJobBak { + + @Autowired + private OnlineDataCache onlineDataCache; + @Autowired + private SettleService settleService; + @Autowired + private RedissonClient redissonClient; + + private final static int CPU_NUM = 300; + + public static Executor CHECK_ROOM_EXECUTOR; + + static { + // 丢弃队列 + ThreadPoolExecutor syncExecutor = new ThreadPoolExecutor(CPU_NUM, + CPU_NUM * 2, + 5, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + init("RoomKouQianJob-%d"), + new ThreadPoolExecutor.DiscardPolicy()); + CHECK_ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(syncExecutor); + + } + + private static ThreadFactory init(String nameFormat) { + return new ThreadFactoryBuilder().setNameFormat(nameFormat).build(); + } + + private static ThreadPoolExecutor initExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit timeUnit, + BlockingQueue workQueue, String nameFormat) { + return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, + init(nameFormat)); + } + + */ +/** + * 每 1分钟执行一次 + *//* + +// @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES) + @PostConstruct + public void run() { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.execute(() -> { + while (true) { + Set all = onlineDataCache.getAll(); + for (String roomIdS : all) { + try { + Long roomId = Long.valueOf(roomIdS); + CHECK_ROOM_EXECUTOR.execute(() -> { + RBucket bucket = redissonClient.getBucket(RedisConstant.REDIS_P + "koufeiLock-" + roomId); + boolean lock = bucket.setIfExists(RandomUtil.randomString(10), Duration.ofMinutes(1)); + if (!lock) { + return; + } + boolean success = settleService.withholdingFee(roomId); + if (!success) { + bucket.delete(); + } + }); + } catch (Exception e) { + log.info("房间检测失败!需要开发排查问题 roomId={}", roomIdS, e); + } + } + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + } + + +} +*/ diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobService.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/service/RoomCheckJobService.java similarity index 96% rename from ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobService.java rename to ruoyi-websocket-boot/src/main/java/com/ruoyi/service/RoomCheckJobService.java index b32e3c1f..4b52922d 100644 --- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobService.java +++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/service/RoomCheckJobService.java @@ -1,4 +1,4 @@ -package com.ruoyi.job; +package com.ruoyi.service; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; @@ -48,10 +48,10 @@ public class RoomCheckJobService { return JobResp.builder().nextCreateJob(false).build(); } // 检查是否三分钟没有接听 - log.info("调试日志:检查是否三分钟没有接听 roomId={}" , roomId); + log.info("调试日志:检查是否2分钟没有接听 roomId={}" , roomId); JobResp resp = this.checkRoomCallerTimeOut(room); if(!resp.isNextCreateJob()){ - log.info("调试日志:三分钟没有接听,可能关闭了 roomId={}" , roomId); + log.info("调试日志:2分钟没有接听,可能关闭了 roomId={}" , roomId); return resp; } // 检查心跳 @@ -126,9 +126,9 @@ public class RoomCheckJobService { Long connectTimeCaller = callUserData.getConnectTime(); Long connectTimeReceiver = receiverUserData.getConnectTime(); boolean timeOut = false; - if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){ + if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 120){ timeOut = true; - }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){ + }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 120){ timeOut = true; } if(timeOut){ diff --git a/ruoyi-websocket-boot/src/main/resources/application-dev.yml b/ruoyi-websocket-boot/src/main/resources/application-dev.yml index 6378f661..9c6c46c1 100644 --- a/ruoyi-websocket-boot/src/main/resources/application-dev.yml +++ b/ruoyi-websocket-boot/src/main/resources/application-dev.yml @@ -32,9 +32,9 @@ spring: driverClassName: com.mysql.cj.jdbc.Driver # jdbc 所有参数配置参考 https://lionli.blog.csdn.net/article/details/122018562 # rewriteBatchedStatements=true 批处理优化 大幅提升批量插入更新删除性能(对数据库有性能损耗 使用批量操作应考虑性能问题) - url: jdbc:mysql://localhost:3306/cai-new?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true + url: jdbc:mysql://124.222.254.188:4306/cai-v2?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true username: root - password: 123 + password: 383200134 # 从库数据源 slave: lazy: true @@ -69,7 +69,7 @@ spring: # 数据库索引 database: 12 # 密码(如没有密码请注释掉) - password: 123 + password: 383200134 # 连接超时时间 timeout: 15s # 是否开启ssl @@ -102,18 +102,18 @@ mail: enabled: false spring: rabbitmq: - addresses: 127.0.0.1 #ip地址 + addresses: 124.222.254.188 #ip地址 username: admin # 账号 - password: 123 # 密码 + password: 383200134 # 密码 port: 5672 virtual-host: /cai-dev agora: - app-id: 58ff3a37d91d48c7a8ef7a56fb8f62d0 - key: 0cca1a53262c4c74b0a8c653a9b7540e - secret: 4a4f734285f34aea86cef63b2a186f27 + app-id: 58ff3a37d91d48c7a8ef7a56fb8f62d0123 + key: 0cca1a53262c4c74b0a8c653a9b7540e123 + secret: 4a4f734285f34aea86cef63b2a186f27123 yunxin: - app-key: 0aaefeb8a80a9889987c5346244b58e2 - app-secret: 470345ca2832 + app-key: 0aaefeb8a80a9889987c5346244b58e2123 + app-secret: 470345ca2832123 springdoc: api-docs: enabled: false