This commit is contained in:
张良(004796)
2024-04-16 19:23:14 +08:00
parent 12c184ee1e
commit 447ad89233
23 changed files with 422 additions and 56 deletions

View File

@@ -0,0 +1,57 @@
package com.ruoyi.consumer;
import com.ruoyi.cai.ws.util.SessionObj;
import com.ruoyi.cai.ws.util.WebSocketUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Component
public class RedisConsumer {
@Autowired
private RedissonClient redissonClient;
@PostConstruct
public void init(){
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
executorService.execute(() -> {
while (true){
try {
System.out.println();
run();
System.out.println();
}catch (Exception e){
}
}
});
}
}
private void run() throws InterruptedException {
RBlockingQueue<SessionObj> sessionPush = redissonClient.getBlockingQueue("sessionPush");
SessionObj poll = sessionPush.take();
RLock lock = redissonClient.getLock("sessionPushLock:" + poll.getSessionKey());
try {
lock.tryLock(5,5, TimeUnit.SECONDS);
WebSocketUtils.sendMessage(poll.getSessionKey(), poll.getData());
} catch (InterruptedException e) {
//
} finally {
try {
lock.unlock();
}catch (Exception e){
//
}
}
}
}

View File

@@ -3,7 +3,7 @@ package com.ruoyi.consumer;
import com.ruoyi.cai.mq.AmqpWsProducer;
import com.ruoyi.cai.mq.constant.ws.RoomCheckDelayWsMqConstant;
import com.ruoyi.cai.ws.job.JobResp;
import com.ruoyi.job.RoomCheckJobService;
import com.ruoyi.service.RoomCheckJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,22 +17,21 @@ public class RoomCheckDelayMqConsumer {
@Autowired
private RoomCheckJobService roomCheckJobService;
@RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME
,containerFactory = "customContainerFactory")
@RabbitListener(queues = RoomCheckDelayWsMqConstant.QUEUE_NAME, concurrency = "30")
public void sendRoomCheck(String roomIdStr) {
log.info("开始执行房间检测: roomId={}",roomIdStr);
log.info("开始执行房间检测: roomId={}", roomIdStr);
try {
Long roomId = Long.valueOf(roomIdStr);
JobResp resp = roomCheckJobService.checkRoom(roomId);
if(resp.isNextCreateJob()){
if (resp.isNextCreateJob()) {
// 1分钟后继续执行
log.info("1分钟后继续执行房间检测: roomId={}",roomIdStr);
amqpWsProducer.sendRoomCheckDelay(roomIdStr,60);
log.info("1分钟后继续执行房间检测: roomId={}", roomIdStr);
amqpWsProducer.sendRoomCheckDelay(roomIdStr, 60);
}
}catch (Exception e){
log.error("每分钟定时检测房间失败! roomId={}",roomIdStr);
}finally {
log.info("结束执行房间检测: roomId={}",roomIdStr);
} catch (Exception e) {
log.error("每分钟定时检测房间失败! roomId={}", roomIdStr);
} finally {
log.info("结束执行房间检测: roomId={}", roomIdStr);
}
}

View File

@@ -16,8 +16,7 @@ public class RoomSettleDelayMqConsumer {
@Autowired
private AmqpWsProducer amqpWsProducer;
@RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME
,containerFactory = "customContainerFactory")
@RabbitListener(queues = RoomSettleDelayWsMqConstant.QUEUE_NAME,concurrency = "30")
public void checkTimeOutMq(String message) {
log.info("开始执行预扣费: " + message);
try {

View File

@@ -0,0 +1,27 @@
package com.ruoyi.consumer;
import com.ruoyi.cai.ws.util.SendMessageI;
import com.ruoyi.cai.ws.util.SessionObj;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SendMessageWsImpl implements SendMessageI {
@Autowired
private RedissonClient redissonClient;
@Override
public void sendMessage(String sessionKey, String message){
RBlockingQueue<SessionObj> sessionPush = redissonClient.getBlockingQueue("sessionPush");
SessionObj sessionObj = new SessionObj();
sessionObj.setSessionKey(sessionKey);
sessionObj.setData(message);
try {
sessionPush.put(sessionObj);
} catch (InterruptedException e) {
//
}
}
}

View File

@@ -1,6 +1,7 @@
package com.ruoyi.job;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.service.RoomCheckJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -20,7 +21,7 @@ public class RoomCheckJob {
/**
* 每 7分钟执行一次
*/
@Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES)
@Scheduled(fixedDelay = 5,timeUnit = TimeUnit.MINUTES)
public void run(){
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {

View File

@@ -0,0 +1,73 @@
/*
package com.ruoyi.job;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.service.RoomCheckJobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.*;
@Component
@Slf4j
public class RoomCheckJobBack {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private RoomCheckJobService roomCheckJobService;
private final static int CPU_NUM = 100;
public static Executor CHECK_ROOM_EXECUTOR;
static {
// 丢弃队列
ThreadPoolExecutor syncExecutor = new ThreadPoolExecutor(CPU_NUM,
CPU_NUM * 2,
5,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
init("roomCheckJob-%d"),
new ThreadPoolExecutor.DiscardPolicy());
CHECK_ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(syncExecutor);
}
private static ThreadFactory init(String nameFormat){
return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
}
private static ThreadPoolExecutor initExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue, String nameFormat){
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue,
init(nameFormat));
}
*/
/**
* 每 1分钟执行一次
*//*
@Scheduled(fixedDelay = 3,timeUnit = TimeUnit.MINUTES)
// @PostConstruct
public void run(){
while (true) {
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
CHECK_ROOM_EXECUTOR.execute(() -> {
roomCheckJobService.checkRoom(roomId);
});
}catch (Exception e){
log.info("房间检测失败!需要开发排查问题 roomId={}", roomIdS, e);
}
}
}
}
}
*/

View File

@@ -0,0 +1,101 @@
/*
package com.ruoyi.job;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.ttl.threadpool.TtlExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ruoyi.cai.ws.cache.OnlineDataCache;
import com.ruoyi.cai.ws.constant.RedisConstant;
import com.ruoyi.cai.ws.service.SettleService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.*;
@Component
@Slf4j
public class RoomKouQianJobBak {
@Autowired
private OnlineDataCache onlineDataCache;
@Autowired
private SettleService settleService;
@Autowired
private RedissonClient redissonClient;
private final static int CPU_NUM = 300;
public static Executor CHECK_ROOM_EXECUTOR;
static {
// 丢弃队列
ThreadPoolExecutor syncExecutor = new ThreadPoolExecutor(CPU_NUM,
CPU_NUM * 2,
5,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
init("RoomKouQianJob-%d"),
new ThreadPoolExecutor.DiscardPolicy());
CHECK_ROOM_EXECUTOR = TtlExecutors.getTtlExecutor(syncExecutor);
}
private static ThreadFactory init(String nameFormat) {
return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
}
private static ThreadPoolExecutor initExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue, String nameFormat) {
return new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue,
init(nameFormat));
}
*/
/**
* 每 1分钟执行一次
*//*
// @Scheduled(fixedDelay = 7,timeUnit = TimeUnit.MINUTES)
@PostConstruct
public void run() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
while (true) {
Set<String> all = onlineDataCache.getAll();
for (String roomIdS : all) {
try {
Long roomId = Long.valueOf(roomIdS);
CHECK_ROOM_EXECUTOR.execute(() -> {
RBucket<String> bucket = redissonClient.getBucket(RedisConstant.REDIS_P + "koufeiLock-" + roomId);
boolean lock = bucket.setIfExists(RandomUtil.randomString(10), Duration.ofMinutes(1));
if (!lock) {
return;
}
boolean success = settleService.withholdingFee(roomId);
if (!success) {
bucket.delete();
}
});
} catch (Exception e) {
log.info("房间检测失败!需要开发排查问题 roomId={}", roomIdS, e);
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
*/

View File

@@ -1,4 +1,4 @@
package com.ruoyi.job;
package com.ruoyi.service;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
@@ -48,10 +48,10 @@ public class RoomCheckJobService {
return JobResp.builder().nextCreateJob(false).build();
}
// 检查是否三分钟没有接听
log.info("调试日志:检查是否分钟没有接听 roomId={}" , roomId);
log.info("调试日志:检查是否2分钟没有接听 roomId={}" , roomId);
JobResp resp = this.checkRoomCallerTimeOut(room);
if(!resp.isNextCreateJob()){
log.info("调试日志:分钟没有接听,可能关闭了 roomId={}" , roomId);
log.info("调试日志:2分钟没有接听,可能关闭了 roomId={}" , roomId);
return resp;
}
// 检查心跳
@@ -126,9 +126,9 @@ public class RoomCheckJobService {
Long connectTimeCaller = callUserData.getConnectTime();
Long connectTimeReceiver = receiverUserData.getConnectTime();
boolean timeOut = false;
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 180){
if(connectTimeCaller != null && DateUtil.currentSeconds() - connectTimeCaller > 120){
timeOut = true;
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 180){
}else if(connectTimeReceiver != null && DateUtil.currentSeconds() - connectTimeReceiver > 120){
timeOut = true;
}
if(timeOut){