diff --git a/ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java b/ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java
new file mode 100644
index 00000000..7b28782c
--- /dev/null
+++ b/ruoyi-admin/src/main/java/com/ruoyi/find/SendMessageHttpImpl.java
@@ -0,0 +1,12 @@
+package com.ruoyi.find;
+
+import com.ruoyi.cai.ws.util.SendMessageI;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SendMessageHttpImpl implements SendMessageI {
+
+ @Override
+ public void sendMessage(String sessionKey, String message){
+ }
+}
diff --git a/ruoyi-cai/pom.xml b/ruoyi-cai/pom.xml
index e559ba56..55fcc9c6 100644
--- a/ruoyi-cai/pom.xml
+++ b/ruoyi-cai/pom.xml
@@ -30,6 +30,12 @@
org.springframework.boot
spring-boot-starter-websocket
+
+
+ org.springframework.boot
+ spring-boot-starter-tomcat
+
+
com.ruoyi
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java
index 6600822e..16ac555d 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/AmqpWsProducer.java
@@ -21,6 +21,7 @@ public class AmqpWsProducer {
});
}
+ @Deprecated
public void sendRoomSettleDelay(String message, Integer timeout){
rabbitTemplate.convertAndSend(RoomSettleDelayWsMqConstant.EXCHANGE_NAME,
RoomSettleDelayWsMqConstant.ROUTING_KEY,
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java
index 5ec2587f..f256c4d3 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/trd/Agora.java
@@ -42,8 +42,14 @@ public class Agora {
if(jsonobject == null){
return Collections.emptyList();
}
- JSONArray jsonArray = jsonobject.getJSONObject("data").getJSONArray("broadcasters");
- return jsonArray.toJavaList(String.class);
+ JSONObject data = jsonobject.getJSONObject("data");
+ if(data != null){
+ JSONArray jsonArray = data.getJSONArray("broadcasters");
+ if(jsonArray != null){
+ return jsonArray.toJavaList(String.class);
+ }
+ }
+ return new ArrayList<>();
}
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java
index 5bca70a4..0362afdc 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/handler/RoomWebSocketHandler.java
@@ -119,6 +119,13 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage());
+ try{
+ if(session.isOpen()){
+ session.close();
+ }
+ }catch (Exception e){
+ log.error(e.getMessage());
+ }
}
/**
@@ -133,7 +140,13 @@ public class RoomWebSocketHandler extends AbstractWebSocketHandler {
// TODO 连接关闭 是否要删除fd的关系
WebSocketSessionHolder.removeSession(session.getId());
log.info("[disconnect] sessionId: {},token:{}", session.getId(), token);
- session.close();
+ try{
+ if(session.isOpen()){
+ session.close();
+ }
+ }catch (Exception e){
+ log.error(e.getMessage());
+ }
}
/**
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java
index ff74dd2c..193e7c93 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/holder/WebSocketSessionHolder.java
@@ -4,6 +4,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.web.socket.WebSocketSession;
+import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -17,19 +18,36 @@ import java.util.concurrent.ConcurrentHashMap;
public class WebSocketSessionHolder {
private static final Map USER_SESSION_MAP = new ConcurrentHashMap<>();
+ private static final Map USER_SESSION_ID_MAP = new ConcurrentHashMap<>();
public static int size(){
return USER_SESSION_MAP.size();
}
- public static void addSession(String sessionKey, WebSocketSession session) {
+ public static void addSession(String sessionKey, WebSocketSession session,Long userId) {
USER_SESSION_MAP.put(sessionKey, session);
+ if(USER_SESSION_ID_MAP.containsKey(userId)){ // T人动作
+ String sessionKeyOld = USER_SESSION_ID_MAP.get(userId);
+ if(sessionKeyOld != null){
+ WebSocketSession webSocketSession = USER_SESSION_MAP.get(sessionKeyOld);
+ if(webSocketSession != null){
+ if(webSocketSession.isOpen()){
+ try {
+ webSocketSession.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ removeSession(sessionKeyOld);
+ }
+
+ }
+ }
+ USER_SESSION_ID_MAP.put(userId,sessionKey);
}
public static void removeSession(String sessionKey) {
- if (USER_SESSION_MAP.containsKey(sessionKey)) {
- USER_SESSION_MAP.remove(sessionKey);
- }
+ USER_SESSION_MAP.remove(sessionKey);
}
public static WebSocketSession getSessions(String sessionKey) {
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java
index 814089cf..d76d9211 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/interceptor/PlusWebSocketInterceptor.java
@@ -29,6 +29,7 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor {
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) {
+
return true;
}
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java
index 467f9b92..a01ec90e 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/manager/WebSocketManager.java
@@ -21,6 +21,8 @@ import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.concurrent.TimeUnit;
+
@Component
public class WebSocketManager {
@Autowired
@@ -64,14 +66,21 @@ public class WebSocketManager {
return room.getRoomId();
}
RLock lock = redissonClient.getLock(String.format(RedisConstant.INIT_ROOM_LOCK, userCall.getFromUid(), userCall.getToUid()));
- boolean b = lock.tryLock();
- if(!b){
- throw new ServiceException("系统繁忙");
- }
+ boolean b;
try {
+ b = lock.tryLock(1, TimeUnit.MINUTES);
+ if(!b){
+ throw new ServiceException("系统繁忙");
+ }
roomId = initRoom(userCall);
- }finally {
- lock.unlock();
+ } catch (InterruptedException e) {
+ throw new ServiceException("系统繁忙");
+ } finally {
+ try {
+ lock.unlock();
+ }catch (Exception e){
+ // ignore
+ }
}
return roomId;
}
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java
index c3eb40f0..fa64326f 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/processon/OpenLogic.java
@@ -71,7 +71,7 @@ public class OpenLogic {
WsExceptionUtil.throwExceptionFast(session,"无效token");
return;
}
- WebSocketSessionHolder.addSession(session.getId(), session);
+ WebSocketSessionHolder.addSession(session.getId(), session, wsToken.getUserId());
Long roomId = wsToken.getRoomId();
Long userId = wsToken.getUserId();
Room room = roomService.load(roomId);
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java
index f460d044..baaa5e78 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/service/SettleService.java
@@ -27,6 +27,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
@Component
@Slf4j
@@ -99,7 +100,7 @@ public class SettleService {
/**
* 结算处理
*/
- public SettleResp processOn(Long roomId){
+ public SettleResp processOn(Long roomId) {
Room room = roomService.load(roomId);
if(room == null){
log.warn("房间不存在,无需结算 roomId={}",roomId);
@@ -112,17 +113,23 @@ public class SettleService {
log.info("正在结算中,稍等!");
return SettleResp.builder().nextRun(true).build();
}
- boolean lockFlag = clientLock.tryLock();
- if(!lockFlag){
- log.info("正在结算中,稍等!");
- return SettleResp.builder().nextRun(true).build();
- }
+ boolean lockFlag = false;
try {
+ lockFlag = clientLock.tryLock(5,60, TimeUnit.SECONDS);
+ if(!lockFlag){
+ log.info("正在结算中,稍等!");
+ return SettleResp.builder().nextRun(true).build();
+ }
return deal(room);
- }finally {
- clientLock.unlock();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ clientLock.unlock();
+ }catch (Exception e){
+ // ignore
+ }
}
-
}
@Autowired
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java
index eddecdf2..72b0e119 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/RoomWebSocketUtil.java
@@ -2,22 +2,41 @@ package com.ruoyi.cai.ws.util;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.cai.ws.dto.WsR;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
+import javax.annotation.PostConstruct;
import java.util.List;
+@Component
public class RoomWebSocketUtil {
+
+ @Autowired
+ private SendMessageI sendMessage;
+
+ private static SendMessageI sendMessageStatic;
+
+ @PostConstruct
+ public void init(){
+ sendMessageStatic = sendMessage;
+ }
+
+
public static void sendSendMessage(String sessionKey, WsR r){
- WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r));
+ sendMessageStatic.sendMessage(sessionKey, JSON.toJSONString(r));
+// WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r));
}
public static void sendSendMessage(WebSocketSession sessionKey, WsR r){
- WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r));
+ sendMessageStatic.sendMessage(sessionKey.getId(), JSON.toJSONString(r));
+// WebSocketUtils.sendMessage(sessionKey, JSON.toJSONString(r));
}
public static void sendSendMessage(List sessionKey, WsR r){
for (String s : sessionKey) {
- WebSocketUtils.sendMessage(s, JSON.toJSONString(r));
+ sendMessageStatic.sendMessage(s, JSON.toJSONString(r));
+// WebSocketUtils.sendMessage(s, JSON.toJSONString(r));
}
}
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java
new file mode 100644
index 00000000..357df91e
--- /dev/null
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SendMessageI.java
@@ -0,0 +1,6 @@
+package com.ruoyi.cai.ws.util;
+
+
+public interface SendMessageI {
+ void sendMessage(String sessionKey, String message);
+}
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java
new file mode 100644
index 00000000..11a7cd25
--- /dev/null
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/SessionObj.java
@@ -0,0 +1,9 @@
+package com.ruoyi.cai.ws.util;
+
+import lombok.Data;
+
+@Data
+public class SessionObj {
+ private String sessionKey;
+ private String data;
+}
diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java
index 78f7c1b7..9665e219 100644
--- a/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java
+++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/ws/util/WebSocketUtils.java
@@ -45,7 +45,9 @@ public class WebSocketUtils {
public static boolean close(WebSocketSession session) {
if(session != null){
try {
- session.close();
+ if(session.isOpen()){
+ session.close();
+ }
return true;
} catch (IOException e) {
log.error("关闭ws失败,session={}",session,e);
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java
new file mode 100644
index 00000000..9eec3b50
--- /dev/null
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RedisConsumer.java
@@ -0,0 +1,57 @@
+package com.ruoyi.consumer;
+
+import com.ruoyi.cai.ws.util.SessionObj;
+import com.ruoyi.cai.ws.util.WebSocketUtils;
+import org.redisson.api.RBlockingQueue;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisConsumer {
+ @Autowired
+ private RedissonClient redissonClient;
+
+
+ @PostConstruct
+ public void init(){
+ ExecutorService executorService = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < 20; i++) {
+ executorService.execute(() -> {
+ while (true){
+ try {
+ System.out.println();
+ run();
+ System.out.println();
+ }catch (Exception e){
+
+ }
+ }
+ });
+ }
+ }
+
+ private void run() throws InterruptedException {
+ RBlockingQueue sessionPush = redissonClient.getBlockingQueue("sessionPush");
+ SessionObj poll = sessionPush.take();
+ RLock lock = redissonClient.getLock("sessionPushLock:" + poll.getSessionKey());
+ try {
+ lock.tryLock(5,5, TimeUnit.SECONDS);
+ WebSocketUtils.sendMessage(poll.getSessionKey(), poll.getData());
+ } catch (InterruptedException e) {
+ //
+ } finally {
+ try {
+ lock.unlock();
+ }catch (Exception e){
+ //
+ }
+ }
+ }
+}
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java
index 4e26ebae..ca3259b8 100644
--- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomCheckDelayMqConsumer.java
@@ -3,7 +3,7 @@ package com.ruoyi.consumer;
import com.ruoyi.cai.mq.AmqpWsProducer;
import com.ruoyi.cai.mq.constant.ws.RoomCheckDelayWsMqConstant;
import com.ruoyi.cai.ws.job.JobResp;
-import com.ruoyi.job.RoomCheckJobService;
+import com.ruoyi.service.RoomCheckJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,22 +17,21 @@ public class RoomCheckDelayMqConsumer {
@Autowired
private RoomCheckJobService roomCheckJobService;
- @RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME
- ,containerFactory = "customContainerFactory")
+ @RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME, concurrency = "30")
public void sendRoomCheck(String roomIdStr) {
- log.info("开始执行房间检测: roomId={}",roomIdStr);
+ log.info("开始执行房间检测: roomId={}", roomIdStr);
try {
Long roomId = Long.valueOf(roomIdStr);
JobResp resp = roomCheckJobService.checkRoom(roomId);
- if(resp.isNextCreateJob()){
+ if (resp.isNextCreateJob()) {
// 1分钟后继续执行
- log.info("1分钟后继续执行房间检测: roomId={}",roomIdStr);
- amqpWsProducer.sendRoomCheckDelay(roomIdStr,60);
+ log.info("1分钟后继续执行房间检测: roomId={}", roomIdStr);
+ amqpWsProducer.sendRoomCheckDelay(roomIdStr, 60);
}
- }catch (Exception e){
- log.error("每分钟定时检测房间失败! roomId={}",roomIdStr);
- }finally {
- log.info("结束执行房间检测: roomId={}",roomIdStr);
+ } catch (Exception e) {
+ log.error("每分钟定时检测房间失败! roomId={}", roomIdStr);
+ } finally {
+ log.info("结束执行房间检测: roomId={}", roomIdStr);
}
}
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java
index d5d37637..f4da2086 100644
--- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/RoomSettleDelayMqConsumer.java
@@ -16,8 +16,7 @@ public class RoomSettleDelayMqConsumer {
@Autowired
private AmqpWsProducer amqpWsProducer;
- @RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME
- ,containerFactory = "customContainerFactory")
+ @RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME,concurrency = "30")
public void checkTimeOutMq(String message) {
log.info("开始执行预扣费: " + message);
try {
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java
new file mode 100644
index 00000000..11c869a1
--- /dev/null
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/consumer/SendMessageWsImpl.java
@@ -0,0 +1,27 @@
+package com.ruoyi.consumer;
+
+import com.ruoyi.cai.ws.util.SendMessageI;
+import com.ruoyi.cai.ws.util.SessionObj;
+import org.redisson.api.RBlockingQueue;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SendMessageWsImpl implements SendMessageI {
+ @Autowired
+ private RedissonClient redissonClient;
+
+ @Override
+ public void sendMessage(String sessionKey, String message){
+ RBlockingQueue sessionPush = redissonClient.getBlockingQueue("sessionPush");
+ SessionObj sessionObj = new SessionObj();
+ sessionObj.setSessionKey(sessionKey);
+ sessionObj.setData(message);
+ try {
+ sessionPush.put(sessionObj);
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+}
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java
index 0777cd41..7ad4c061 100644
--- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJob.java
@@ -1,6 +1,7 @@
package com.ruoyi.job;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
+import com.ruoyi.service.RoomCheckJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -20,7 +21,7 @@ public class RoomCheckJob {
/**
* 每 7分钟执行一次
*/
- @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES)
+ @Scheduled(fixedDelay = 5,timeUnit = TimeUnit.MINUTES)
public void run(){
Set all = onlineDataCache.getAll();
for (String roomIdS : all) {
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java
new file mode 100644
index 00000000..5fa2aa11
--- /dev/null
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobBack.java
@@ -0,0 +1,73 @@
+/*
+package com.ruoyi.job;
+
+import com.alibaba.ttl.threadpool.TtlExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.ruoyi.cai.ws.cache.OnlineDataCache;
+import com.ruoyi.service.RoomCheckJobService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Set;
+import java.util.concurrent.*;
+
+@Component
+@Slf4j
+public class RoomCheckJobBack {
+ @Autowired
+ private OnlineDataCache onlineDataCache;
+ @Autowired
+ private RoomCheckJobService roomCheckJobService;
+
+ private final static int CPU_NUM = 100;
+
+ public static Executor CHECK_ROOM_EXECUTOR;
+
+ static {
+ // 丢弃队列
+ ThreadPoolExecutor syncExecutor = new ThreadPoolExecutor(CPU_NUM,
+ CPU_NUM * 2,
+ 5,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ init("roomCheckJob-%d"),
+ new ThreadPoolExecutor.DiscardPolicy());
+ CHECK_ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(syncExecutor);
+
+ }
+
+ private static ThreadFactory init(String nameFormat){
+ return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
+ }
+
+ private static ThreadPoolExecutor initExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
+ BlockingQueue workQueue, String nameFormat){
+ return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue,
+ init(nameFormat));
+ }
+ */
+/**
+ * 每 1分钟执行一次
+ *//*
+
+ @Scheduled(fixedDelay = 3,timeUnit = TimeUnit.MINUTES)
+// @PostConstruct
+ public void run(){
+ while (true) {
+ Set all = onlineDataCache.getAll();
+ for (String roomIdS : all) {
+ try {
+ Long roomId = Long.valueOf(roomIdS);
+ CHECK_ROOM_EXECUTOR.execute(() -> {
+ roomCheckJobService.checkRoom(roomId);
+ });
+ }catch (Exception e){
+ log.info("房间检测失败!需要开发排查问题 roomId={}", roomIdS, e);
+ }
+ }
+ }
+ }
+}
+*/
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java
new file mode 100644
index 00000000..45c4c5d9
--- /dev/null
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomKouQianJobBak.java
@@ -0,0 +1,101 @@
+/*
+package com.ruoyi.job;
+
+import cn.hutool.core.util.RandomUtil;
+import com.alibaba.ttl.threadpool.TtlExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.ruoyi.cai.ws.cache.OnlineDataCache;
+import com.ruoyi.cai.ws.constant.RedisConstant;
+import com.ruoyi.cai.ws.service.SettleService;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RBucket;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.*;
+
+@Component
+@Slf4j
+public class RoomKouQianJobBak {
+
+ @Autowired
+ private OnlineDataCache onlineDataCache;
+ @Autowired
+ private SettleService settleService;
+ @Autowired
+ private RedissonClient redissonClient;
+
+ private final static int CPU_NUM = 300;
+
+ public static Executor CHECK_ROOM_EXECUTOR;
+
+ static {
+ // 丢弃队列
+ ThreadPoolExecutor syncExecutor = new ThreadPoolExecutor(CPU_NUM,
+ CPU_NUM * 2,
+ 5,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ init("RoomKouQianJob-%d"),
+ new ThreadPoolExecutor.DiscardPolicy());
+ CHECK_ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(syncExecutor);
+
+ }
+
+ private static ThreadFactory init(String nameFormat) {
+ return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
+ }
+
+ private static ThreadPoolExecutor initExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
+ BlockingQueue workQueue, String nameFormat) {
+ return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue,
+ init(nameFormat));
+ }
+
+ */
+/**
+ * 每 1分钟执行一次
+ *//*
+
+// @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES)
+ @PostConstruct
+ public void run() {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.execute(() -> {
+ while (true) {
+ Set all = onlineDataCache.getAll();
+ for (String roomIdS : all) {
+ try {
+ Long roomId = Long.valueOf(roomIdS);
+ CHECK_ROOM_EXECUTOR.execute(() -> {
+ RBucket bucket = redissonClient.getBucket(RedisConstant.REDIS_P + "koufeiLock-" + roomId);
+ boolean lock = bucket.setIfExists(RandomUtil.randomString(10), Duration.ofMinutes(1));
+ if (!lock) {
+ return;
+ }
+ boolean success = settleService.withholdingFee(roomId);
+ if (!success) {
+ bucket.delete();
+ }
+ });
+ } catch (Exception e) {
+ log.info("房间检测失败!需要开发排查问题 roomId={}", roomIdS, e);
+ }
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ }
+
+
+}
+*/
diff --git a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobService.java b/ruoyi-websocket-boot/src/main/java/com/ruoyi/service/RoomCheckJobService.java
similarity index 96%
rename from ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobService.java
rename to ruoyi-websocket-boot/src/main/java/com/ruoyi/service/RoomCheckJobService.java
index b32e3c1f..4b52922d 100644
--- a/ruoyi-websocket-boot/src/main/java/com/ruoyi/job/RoomCheckJobService.java
+++ b/ruoyi-websocket-boot/src/main/java/com/ruoyi/service/RoomCheckJobService.java
@@ -1,4 +1,4 @@
-package com.ruoyi.job;
+package com.ruoyi.service;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
@@ -48,10 +48,10 @@ public class RoomCheckJobService {
return JobResp.builder().nextCreateJob(false).build();
}
// 检查是否三分钟没有接听
- log.info("调试日志:检查是否三分钟没有接听 roomId={}" , roomId);
+ log.info("调试日志:检查是否2分钟没有接听 roomId={}" , roomId);
JobResp resp = this.checkRoomCallerTimeOut(room);
if(!resp.isNextCreateJob()){
- log.info("调试日志:三分钟没有接听,可能关闭了 roomId={}" , roomId);
+ log.info("调试日志:2分钟没有接听,可能关闭了 roomId={}" , roomId);
return resp;
}
// 检查心跳
@@ -126,9 +126,9 @@ public class RoomCheckJobService {
Long connectTimeCaller = callUserData.getConnectTime();
Long connectTimeReceiver = receiverUserData.getConnectTime();
boolean timeOut = false;
- if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
+ if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 120){
timeOut = true;
- }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
+ }else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 120){
timeOut = true;
}
if(timeOut){
diff --git a/ruoyi-websocket-boot/src/main/resources/application-dev.yml b/ruoyi-websocket-boot/src/main/resources/application-dev.yml
index 6378f661..9c6c46c1 100644
--- a/ruoyi-websocket-boot/src/main/resources/application-dev.yml
+++ b/ruoyi-websocket-boot/src/main/resources/application-dev.yml
@@ -32,9 +32,9 @@ spring:
driverClassName: com.mysql.cj.jdbc.Driver
# jdbc 所有参数配置参考 https://lionli.blog.csdn.net/article/details/122018562
# rewriteBatchedStatements=true 批处理优化 大幅提升批量插入更新删除性能(对数据库有性能损耗 使用批量操作应考虑性能问题)
- url: jdbc:mysql://localhost:3306/cai-new?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true
+ url: jdbc:mysql://124.222.254.188:4306/cai-v2?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true
username: root
- password: 123
+ password: 383200134
# 从库数据源
slave:
lazy: true
@@ -69,7 +69,7 @@ spring:
# 数据库索引
database: 12
# 密码(如没有密码请注释掉)
- password: 123
+ password: 383200134
# 连接超时时间
timeout: 15s
# 是否开启ssl
@@ -102,18 +102,18 @@ mail:
enabled: false
spring:
rabbitmq:
- addresses: 127.0.0.1 #ip地址
+ addresses: 124.222.254.188 #ip地址
username: admin # 账号
- password: 123 # 密码
+ password: 383200134 # 密码
port: 5672
virtual-host: /cai-dev
agora:
- app-id: 58ff3a37d91d48c7a8ef7a56fb8f62d0
- key: 0cca1a53262c4c74b0a8c653a9b7540e
- secret: 4a4f734285f34aea86cef63b2a186f27
+ app-id: 58ff3a37d91d48c7a8ef7a56fb8f62d0123
+ key: 0cca1a53262c4c74b0a8c653a9b7540e123
+ secret: 4a4f734285f34aea86cef63b2a186f27123
yunxin:
- app-key: 0aaefeb8a80a9889987c5346244b58e2
- app-secret: 470345ca2832
+ app-key: 0aaefeb8a80a9889987c5346244b58e2123
+ app-secret: 470345ca2832123
springdoc:
api-docs:
enabled: false