This commit is contained in:
张良(004796)
2024-01-30 11:39:38 +08:00
parent d0bfbf36a1
commit 135f45d2c3
18 changed files with 407 additions and 37 deletions

View File

@@ -3,6 +3,9 @@ package com.ruoyi.cai.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ruoyi.cai.domain.SysPush;
import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams;
import com.ruoyi.cai.dto.admin.vo.syspush.SendSysPushResp;
import java.util.List;
/**
* 推送系统消息Service接口
@@ -13,4 +16,16 @@ import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams;
public interface SysPushService extends IService<SysPush> {
void saveSysPush(SysPushParams params);
void fastStart(Long id);
void handStart(Long id);
void doRun(Long id);
SendSysPushResp sendSysPush(SysPush sysPush, List<Long> userIds);
void checkAll();
void closeSysPush(Long id);
}

View File

@@ -2,18 +2,27 @@ package com.ruoyi.cai.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.ruoyi.cai.domain.SysPush;
import com.ruoyi.cai.domain.SysPushLog;
import com.ruoyi.cai.domain.User;
import com.ruoyi.cai.dto.admin.query.syspush.SysPushImageContextDTO;
import com.ruoyi.cai.dto.admin.query.syspush.SysPushMasterDTO;
import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams;
import com.ruoyi.cai.dto.admin.vo.syspush.MinUserDTO;
import com.ruoyi.cai.dto.admin.vo.syspush.SendSysPushResp;
import com.ruoyi.cai.enums.GenderEnum;
import com.ruoyi.cai.enums.SystemConfigEnum;
import com.ruoyi.cai.enums.systempush.SystemPushStatusEnum;
import com.ruoyi.cai.enums.systempush.SystemPushTypeEnum;
import com.ruoyi.cai.enums.systempush.*;
import com.ruoyi.cai.manager.SystemConfigManager;
import com.ruoyi.cai.mapper.SysPushMapper;
import com.ruoyi.cai.mq.AmqpProducer;
import com.ruoyi.cai.mq.CommonDelayTypeEnum;
import com.ruoyi.cai.mq.dto.CommonDelayDto;
import com.ruoyi.cai.mq.handle.dto.SysPushDTO;
import com.ruoyi.cai.notice.data.NoticeMsgR;
import com.ruoyi.cai.notice.data.NoticeOnlyImageR;
import com.ruoyi.cai.notice.data.NoticeR;
@@ -24,17 +33,23 @@ import com.ruoyi.cai.notice.data.child.SimpleImageTextData;
import com.ruoyi.cai.notice.enums.MessageBaseTypeEnum;
import com.ruoyi.cai.service.SysPushService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.util.CaiDateUtil;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.common.utils.BeanConvertUtil;
import com.ruoyi.yunxin.Yunxin;
import com.ruoyi.yunxin.enums.ImTypeEnum;
import com.ruoyi.yunxin.resp.YxCommonR;
import com.ruoyi.yunxin.resp.YxDataR;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 推送系统消息Service业务层处理
@@ -43,6 +58,7 @@ import java.util.List;
* @date 2024-01-28
*/
@Service
@Slf4j
public class SysPushServiceImpl extends ServiceImpl<SysPushMapper, SysPush> implements SysPushService {
@Autowired
@@ -51,6 +67,8 @@ public class SysPushServiceImpl extends ServiceImpl<SysPushMapper, SysPush> impl
private Yunxin yunxin;
@Autowired
private SystemConfigManager systemConfigManager;
@Autowired
private AmqpProducer amqpProducer;
@Override
public void saveSysPush(SysPushParams params) {
@@ -60,19 +78,38 @@ public class SysPushServiceImpl extends ServiceImpl<SysPushMapper, SysPush> impl
if(systemPushType == null){
throw new ServiceException("消息类型有误!");
}
User user = userService.getByUserCode(master.getUsercode());
if(user == null){
throw new ServiceException("蜜瓜号不存在");
SystemPushGroupIdEnum groupIdEnum = SystemPushGroupIdEnum.getByCode(master.getGroupId());
if(groupIdEnum == null){
throw new ServiceException("发送类型有误");
}
SysPush sysPush = new SysPush();
if(groupIdEnum == SystemPushGroupIdEnum.ONE_USER){
User user = userService.getByUserCode(master.getUsercode());
if(user == null){
throw new ServiceException("蜜瓜号不存在!");
}
sysPush.setUserId(user.getId());
sysPush.setUsercode(user.getUsercode());
}
SystemPushSendTimeTypeEnum systemPushSendTime = SystemPushSendTimeTypeEnum.getByCode(master.getSendTimeType());
if(systemPushSendTime == null){
throw new ServiceException("发送方式有误!");
}
sysPush.setSendTimeType(systemPushSendTime.getCode());
if(systemPushSendTime == SystemPushSendTimeTypeEnum.SCHEDULED_SEND){
long between = CaiDateUtil.diff(sysPush.getEndTime(), LocalDateTime.now());
if(between < 0){
throw new ServiceException("定时发送时间不能少于当前时间");
}
if(between < 60*3){
throw new ServiceException("定时发送最低限制为3分钟以后");
}
sysPush.setSendTime(master.getSendTime());
}
sysPush.setGroupId(master.getGroupId());
sysPush.setType(master.getType());
sysPush.setUserId(user.getId());
sysPush.setUsercode(master.getUsercode());
sysPush.setTitle(master.getTitle());
sysPush.setRemark(master.getRemark());
sysPush.setSendTimeType(master.getSendTimeType());
sysPush.setSendTime(master.getSendTime());
sysPush.setStatus(SystemPushStatusEnum.NO_RUN.getCode());
String context = null;
switch (systemPushType){
@@ -135,9 +172,146 @@ public class SysPushServiceImpl extends ServiceImpl<SysPushMapper, SysPush> impl
sysPush.setContent(context);
sysPush.setParams(JSON.toJSONString(params));
this.save(sysPush);
try {
// 小于 < 12小时
long between = CaiDateUtil.diff(sysPush.getEndTime(), LocalDateTime.now());
CommonDelayDto dto = new CommonDelayDto();
dto.setSysPushId(sysPush.getId());
dto.setType(CommonDelayTypeEnum.SYS_PUSH.getCode());
amqpProducer.sendCommonDelayMq(dto,(int)between+2);
}catch (Exception e){
log.error("延时任务发送失败!");
}
}
@Override
public void fastStart(Long id){
boolean update = this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId, id)
.eq(SysPush::getStatus, SystemPushStatusEnum.NO_RUN.getCode())
.set(SysPush::getStatus, SystemPushStatusEnum.RUNNING.getCode()));
if(!update){
throw new ServiceException("正在执行中,请稍等");
}
this.doRun(id);
}
@Override
public void handStart(Long id){
boolean update = this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId, id)
.eq(SysPush::getStatus, SystemPushStatusEnum.NO_RUN.getCode())
.set(SysPush::getStatus, SystemPushStatusEnum.RUNNING.getCode()));
if(!update){
throw new ServiceException("正在执行中,请稍等");
}
SysPushDTO sysPush = new SysPushDTO();
sysPush.setSysPushId(id);
amqpProducer.sendCommonMq(sysPush);
}
@Override
public void doRun(Long id){
SysPush sysPush = this.getById(id);
Integer status = sysPush.getStatus();
if(SystemPushStatusEnum.COMPLETE.getCode().equals(status)){
throw new ServiceException("不允许重复执行");
}
if(SystemPushStatusEnum.RUNNING.getCode().equals(status)
|| SystemPushStatusEnum.QUEUE_RUNNING.getCode().equals(status)){
throw new ServiceException("任务正在执行中,不允许重复执行");
}
boolean update = this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId, sysPush.getId())
.eq(SysPush::getStatus, SystemPushStatusEnum.RUNNING.getCode())
.set(SysPush::getStatus, SystemPushStatusEnum.COMPLETE.getCode()));
if(!update){
throw new ServiceException("队列正在执行中");
}
SystemPushGroupIdEnum groupId = SystemPushGroupIdEnum.getByCode(sysPush.getGroupId());
if(groupId == null){
this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId, sysPush.getId())
.set(SysPush::getStatus,SystemPushStatusEnum.FAIL.getCode()));
log.error("未找到执行的人! sysPush={}",JSON.toJSONString(sysPush));
return;
}
List<MinUserDTO> minUserList = new ArrayList<>();
switch (groupId) {
case ONE_USER:
User user = userService.getById(sysPush.getUserId());
if(user != null){
minUserList.add(new MinUserDTO(user.getId(),user.getUsercode(),user.getNickname()));
}
break;
case MEN_USER:
List<User> menUserList = userService.list(Wrappers.lambdaQuery(User.class)
.select(User::getId, User::getUsercode, User::getNickname)
.eq(User::getGender, GenderEnum.MAN.getCode())
.eq(User::getStatus, 0));
minUserList = menUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList());
break;
case WOMEN_USER:
List<User> womenUserList = userService.list(Wrappers.lambdaQuery(User.class)
.select(User::getId, User::getUsercode, User::getNickname)
.eq(User::getGender, GenderEnum.WOMEN.getCode())
.eq(User::getStatus, 0));
minUserList = womenUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList());
break;
case ANCHOR_USER:
List<User> anchorUserList = userService.list(Wrappers.lambdaQuery(User.class)
.select(User::getId, User::getUsercode, User::getNickname)
.eq(User::getIsAnchor, 1)
.eq(User::getStatus, 0));
minUserList = anchorUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList());
break;
case ALL_USER:
List<User> allUserList = userService.list(Wrappers.lambdaQuery(User.class)
.select(User::getId, User::getUsercode, User::getNickname)
.eq(User::getStatus, 0));
minUserList = allUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList());
break;
default:
break;
}
if(minUserList.isEmpty()){
this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId, sysPush.getId())
.set(SysPush::getStatus,SystemPushStatusEnum.FAIL.getCode()));
log.error("未找到发送的人! sysPush={}",JSON.toJSONString(sysPush));
return;
}
List<List<MinUserDTO>> partition = Lists.partition(minUserList, 500);
for (List<MinUserDTO> minUsers : partition) {
SysPushLog sysPushLog = new SysPushLog();
sysPushLog.setSysPushId(sysPush.getId());
sysPushLog.setUserJson(JSON.toJSONString(minUsers));
sysPushLog.setNum(minUsers.size());
try {
List<Long> userIds = minUsers.stream().map(MinUserDTO::getUserId).collect(Collectors.toList());
SendSysPushResp resp = this.sendSysPush(sysPush, userIds);
YxDataR<YxCommonR> imResp = resp.getImResp();
if(imResp != null && imResp.isSuccess()){
sysPushLog.setStatus(SystemPushLogStatusEnum.SEND_SUCCESS.getCode());
}else{
sysPushLog.setStatus(SystemPushLogStatusEnum.SEND_SUCCESS.getCode());
}
sysPushLog.setResult(JSON.toJSONString(imResp));
}catch (Exception e){
log.error("发送失败!",e);
sysPushLog.setStatus(SystemPushLogStatusEnum.SEND_FAIL.getCode());
sysPushLog.setResult(e.getMessage());
}
this.save(sysPush);
}
this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId, sysPush.getId())
.set(SysPush::getStatus,SystemPushStatusEnum.COMPLETE.getCode()));
}
@Override
public SendSysPushResp sendSysPush(SysPush sysPush, List<Long> userIds){
String cosUrl = systemConfigManager.getSystemConfig(SystemConfigEnum.COS_DOMAIN);
Integer type = sysPush.getType();
@@ -196,4 +370,62 @@ public class SysPushServiceImpl extends ServiceImpl<SysPushMapper, SysPush> impl
resp.setImResp(r);
return resp;
}
@Override
public void checkAll(){
List<Long> expireIds = new ArrayList<>(); // 过期未执行的
List<Long> expireConsumer = new ArrayList<>(); // 还有12小时过期的
Page<SysPush> page = new Page<>();
page.setSize(100L);
long current = 0;
while (true){
current++;
page.setCurrent(current);
Page<SysPush> res = this.page(page,Wrappers.lambdaQuery(SysPush.class)
.eq(SysPush::getSendTimeType, SystemPushSendTimeTypeEnum.SCHEDULED_SEND.getCode())
.eq(SysPush::getStatus,SystemPushStatusEnum.NO_RUN.getCode()));
List<SysPush> records = res.getRecords();
if(records.isEmpty()){
break;
}
for (SysPush record : records) {
if(record.getSendTime().isAfter(LocalDateTime.now())){
expireIds.add(record.getId());
}else if(record.getEndTime().plusHours(12).isAfter(LocalDateTime.now())){ // 还有12小时过期
expireConsumer.add(record.getId());
}
}
}
for (Long expireId : expireIds) {
try {
this.fastStart(expireId);
}catch (Exception e){
//
}
}
for (Long expireId : expireConsumer) {
SysPush sysPush = this.getById(expireId);
if(sysPush == null || !Objects.equals(sysPush.getStatus(), SystemPushStatusEnum.NO_RUN.getCode())){
return;
}
// < 12小时
long between = CaiDateUtil.diff(sysPush.getSendTime(),LocalDateTime.now());
if(between > 0 && between < 60*60*12){
CommonDelayDto dto = new CommonDelayDto();
dto.setForbidId(expireId);
dto.setType(CommonDelayTypeEnum.SYS_PUSH.getCode());
amqpProducer.sendCommonDelayMq(dto,(int)between+5);
}
}
}
@Override
public void closeSysPush(Long id) {
this.update(Wrappers.lambdaUpdate(SysPush.class)
.eq(SysPush::getId,id)
.eq(SysPush::getStatus,SystemPushStatusEnum.NO_RUN.getCode())
.set(SysPush::getStatus,SystemPushStatusEnum.COMPLETE.getCode()));
}
}

View File

@@ -16,10 +16,12 @@ import com.ruoyi.cai.enums.ForbidTypeEnum;
import com.ruoyi.cai.manager.ForbidCache;
import com.ruoyi.cai.mapper.UserForbidMapper;
import com.ruoyi.cai.mq.AmqpProducer;
import com.ruoyi.cai.mq.CommonDelayTypeEnum;
import com.ruoyi.cai.mq.dto.CommonDelayDto;
import com.ruoyi.cai.service.AnchorService;
import com.ruoyi.cai.service.UserForbidService;
import com.ruoyi.cai.service.UserService;
import com.ruoyi.cai.util.CaiDateUtil;
import com.ruoyi.common.core.domain.PageQuery;
import com.ruoyi.common.exception.ServiceException;
import lombok.extern.slf4j.Slf4j;
@@ -83,11 +85,11 @@ public class UserForbidServiceImpl extends ServiceImpl<UserForbidMapper, UserFor
}
forbidCache.addForbid(one);
try {
// 后者 - 前者 < 12小时
long between = ChronoUnit.SECONDS.between(LocalDateTime.now(), endTime);
// 12小时
long between = CaiDateUtil.diff(endTime, LocalDateTime.now());
CommonDelayDto dto = new CommonDelayDto();
dto.setForbidId(one.getId());
dto.setType(2);
dto.setType(CommonDelayTypeEnum.USER_FORBID.getCode());
amqpProducer.sendCommonDelayMq(dto,(int)between+2);
}catch (Exception e){
log.error("延时任务发送失败!");