From 135f45d2c307104eba7f99fcccda611773031abb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E8=89=AF=28004796=29?= Date: Tue, 30 Jan 2024 11:39:38 +0800 Subject: [PATCH] 123 --- .../cai/controller/SysPushController.java | 34 ++- .../cai/dto/admin/vo/syspush/MinUserDTO.java | 19 ++ .../systempush/SystemPushGroupIdEnum.java | 10 + ...atus.java => SystemPushLogStatusEnum.java} | 4 +- .../SystemPushSendTimeTypeEnum.java | 10 + .../systempush/SystemPushStatusEnum.java | 2 + .../java/com/ruoyi/cai/job/ForbidJob.java | 8 + .../ruoyi/cai/manager/UserForbidManager.java | 8 +- .../com/ruoyi/cai/mq/CommonDelayTypeEnum.java | 1 + .../mq/consumer/CommonDelayMqConsumer.java | 10 + .../ruoyi/cai/mq/dto/CommonConsumerEnum.java | 3 +- .../com/ruoyi/cai/mq/dto/CommonDelayDto.java | 1 + .../ruoyi/cai/mq/handle/SysPushHandle.java | 31 +++ .../ruoyi/cai/mq/handle/dto/SysPushDTO.java | 14 + .../com/ruoyi/cai/service/SysPushService.java | 15 ++ .../cai/service/impl/SysPushServiceImpl.java | 250 +++++++++++++++++- .../service/impl/UserForbidServiceImpl.java | 8 +- .../java/com/ruoyi/cai/util/CaiDateUtil.java | 16 +- 18 files changed, 407 insertions(+), 37 deletions(-) create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/dto/admin/vo/syspush/MinUserDTO.java rename ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/{SystemPushLogStatus.java => SystemPushLogStatusEnum.java} (80%) create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/SysPushHandle.java create mode 100644 ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/dto/SysPushDTO.java diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/SysPushController.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/SysPushController.java index 96953a96..b0b5e3d2 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/SysPushController.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/controller/SysPushController.java @@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.ruoyi.cai.domain.SysPush; import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams; +import com.ruoyi.cai.dto.app.query.IdRes; import com.ruoyi.cai.service.SysPushService; import com.ruoyi.common.annotation.Log; import com.ruoyi.common.annotation.RepeatSubmit; @@ -12,8 +13,6 @@ import com.ruoyi.common.core.controller.BaseController; import com.ruoyi.common.core.domain.PageQuery; import com.ruoyi.common.core.domain.R; import com.ruoyi.common.core.page.TableDataInfo; -import com.ruoyi.common.core.validate.AddGroup; -import com.ruoyi.common.core.validate.EditGroup; import com.ruoyi.common.enums.BusinessType; import lombok.RequiredArgsConstructor; import org.springframework.validation.annotation.Validated; @@ -60,6 +59,24 @@ public class SysPushController extends BaseController { return R.ok(sysPushService.getById(id)); } + @SaCheckPermission("cai:sysPush:run") + @Log(title = "推送系统消息执行", businessType = BusinessType.INSERT) + @RepeatSubmit() + @PostMapping("/run") + public R run(@RequestBody IdRes idRes) { + sysPushService.handStart(idRes.getId()); + return R.ok(); + } + + @SaCheckPermission("cai:sysPush:close") + @Log(title = "取消定时系统消息执行", businessType = BusinessType.INSERT) + @RepeatSubmit() + @PostMapping("/close") + public R close(@RequestBody IdRes idRes) { + sysPushService.closeSysPush(idRes.getId()); + return R.ok(); + } + /** * 新增推送系统消息 */ @@ -67,22 +84,11 @@ public class SysPushController extends BaseController { @Log(title = "推送系统消息", businessType = BusinessType.INSERT) @RepeatSubmit() @PostMapping() - public R add(@Validated(AddGroup.class) @RequestBody SysPushParams params) { + public R add(@RequestBody SysPushParams params) { sysPushService.saveSysPush(params); return R.ok(); } - /** - * 修改推送系统消息 - */ - @SaCheckPermission("cai:sysPush:edit") - @Log(title = "推送系统消息", businessType = BusinessType.UPDATE) - @RepeatSubmit() - @PutMapping() - public R edit(@Validated(EditGroup.class) @RequestBody SysPush bo) { - return toAjax(sysPushService.updateById(bo)); - } - /** * 删除推送系统消息 * diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/admin/vo/syspush/MinUserDTO.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/admin/vo/syspush/MinUserDTO.java new file mode 100644 index 00000000..ad5d04ea --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/dto/admin/vo/syspush/MinUserDTO.java @@ -0,0 +1,19 @@ +package com.ruoyi.cai.dto.admin.vo.syspush; + +import lombok.Data; + +@Data +public class MinUserDTO { + private Long userId; + private String usercode; + private String nickname; + + public MinUserDTO(Long userId, String usercode, String nickname) { + this.userId = userId; + this.usercode = usercode; + this.nickname = nickname; + } + + public MinUserDTO() { + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushGroupIdEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushGroupIdEnum.java index f362aa99..408500e0 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushGroupIdEnum.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushGroupIdEnum.java @@ -18,4 +18,14 @@ public enum SystemPushGroupIdEnum { this.code = code; this.text = text; } + + public static SystemPushGroupIdEnum getByCode(Integer groupId) { + SystemPushGroupIdEnum[] values = SystemPushGroupIdEnum.values(); + for (SystemPushGroupIdEnum value : values) { + if(value.getCode().equals(groupId)){ + return value; + } + } + return null; + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushLogStatus.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushLogStatusEnum.java similarity index 80% rename from ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushLogStatus.java rename to ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushLogStatusEnum.java index b426fb50..58a789c5 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushLogStatus.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushLogStatusEnum.java @@ -4,7 +4,7 @@ import lombok.Getter; // 推送状态:0=未推送,1=推送中,2=推送失败,3=推送成功 @Getter -public enum SystemPushLogStatus { +public enum SystemPushLogStatusEnum { NO_SEND(0,"未推送"), SENDING(1,"推送中"), SEND_FAIL(2,"推送失败"), @@ -13,7 +13,7 @@ public enum SystemPushLogStatus { private final Integer code; private final String text; - SystemPushLogStatus(Integer code, String text) { + SystemPushLogStatusEnum(Integer code, String text) { this.code = code; this.text = text; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushSendTimeTypeEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushSendTimeTypeEnum.java index 08a6c86e..439072b6 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushSendTimeTypeEnum.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushSendTimeTypeEnum.java @@ -16,4 +16,14 @@ public enum SystemPushSendTimeTypeEnum { this.code = code; this.text = text; } + + public static SystemPushSendTimeTypeEnum getByCode(Integer sendTimeType) { + SystemPushSendTimeTypeEnum[] values = SystemPushSendTimeTypeEnum.values(); + for (SystemPushSendTimeTypeEnum value : values) { + if(value.getCode().equals(sendTimeType)){ + return value; + } + } + return null; + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushStatusEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushStatusEnum.java index 81c67a4c..d1e5968a 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushStatusEnum.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/enums/systempush/SystemPushStatusEnum.java @@ -9,6 +9,8 @@ public enum SystemPushStatusEnum { RUNNING(1,"执行中"), QUEUE_RUNNING(2,"队列执行中"), COMPLETE(3,"已完成"), + CLOSE(4,"已取消"), + FAIL(10,"执行失败"), ; private final Integer code; private final String text; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/job/ForbidJob.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/job/ForbidJob.java index 504cad1d..7d2a71ad 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/job/ForbidJob.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/job/ForbidJob.java @@ -1,6 +1,7 @@ package com.ruoyi.cai.job; import com.ruoyi.cai.manager.UserForbidManager; +import com.ruoyi.cai.service.SysPushService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; @@ -12,6 +13,8 @@ public class ForbidJob { @Autowired private UserForbidManager userForbidManager; + @Autowired + private SysPushService sysPushService; /** * 每12小时执行一次 @@ -23,6 +26,11 @@ public class ForbidJob { }catch (Exception e){ log.error("黑名单失效检测失败!",e); } + try { + sysPushService.checkAll(); + }catch (Exception e){ + log.error("系统推送定时任务检测失败!",e); + } } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/UserForbidManager.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/UserForbidManager.java index 17d29148..1edfc706 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/UserForbidManager.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/manager/UserForbidManager.java @@ -9,8 +9,10 @@ import com.ruoyi.cai.dto.admin.query.UserForbidReq; import com.ruoyi.cai.enums.ForbidTimeEnum; import com.ruoyi.cai.enums.ForbidTypeEnum; import com.ruoyi.cai.mq.AmqpProducer; +import com.ruoyi.cai.mq.CommonDelayTypeEnum; import com.ruoyi.cai.mq.dto.CommonDelayDto; import com.ruoyi.cai.service.*; +import com.ruoyi.cai.util.CaiDateUtil; import com.ruoyi.common.utils.StringUtils; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -197,12 +199,12 @@ public class UserForbidManager { if(userForbid == null || userForbid.getForbidStatus() == 2){ return; } - // 后者 - 前者 < 12小时 - long between = ChronoUnit.SECONDS.between(LocalDateTime.now(), userForbid.getEndTime()); + // < 12小时 + long between = CaiDateUtil.diff(userForbid.getEndTime(),LocalDateTime.now()); if(between > 0 && between < 60*60*12){ CommonDelayDto dto = new CommonDelayDto(); dto.setForbidId(expireId); - dto.setType(2); + dto.setType(CommonDelayTypeEnum.USER_FORBID.getCode()); amqpProducer.sendCommonDelayMq(dto,(int)between+5); } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java index 83d6a1df..14a34224 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/CommonDelayTypeEnum.java @@ -5,6 +5,7 @@ import lombok.Getter; @Getter public enum CommonDelayTypeEnum { USER_FORBID(1), + SYS_PUSH(2), ; private final Integer code; diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java index aa0bf6c6..ee145c13 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/consumer/CommonDelayMqConsumer.java @@ -5,9 +5,11 @@ import com.ruoyi.cai.manager.UserForbidManager; import com.ruoyi.cai.mq.CommonDelayTypeEnum; import com.ruoyi.cai.mq.config.CommonDelayMqConfig; import com.ruoyi.cai.mq.dto.CommonDelayDto; +import com.ruoyi.cai.service.SysPushService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; import org.springframework.stereotype.Component; @Slf4j @@ -15,6 +17,8 @@ import org.springframework.stereotype.Component; public class CommonDelayMqConsumer { @Autowired private UserForbidManager userForbidManager; + @Autowired + private SysPushService sysPushService; @RabbitListener(queues = CommonDelayMqConfig.QUEUE_NAME ,containerFactory = "customContainerFactory") @@ -34,6 +38,12 @@ public class CommonDelayMqConsumer { log.error("检查用户封禁状态失败!需要开发确认!",e); } break; + case SYS_PUSH: + try { + sysPushService.fastStart(dto.getSysPushId()); + }catch (Exception e){ + log.error("检查系统推送失败!需要开发确认!", e); + } default: break; } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonConsumerEnum.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonConsumerEnum.java index dbcd42a9..53cbd091 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonConsumerEnum.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonConsumerEnum.java @@ -1,5 +1,6 @@ package com.ruoyi.cai.mq.dto; public enum CommonConsumerEnum { - RANK,WINDOW_GIFT,WINDOW_RECHARGE + RANK,WINDOW_GIFT,WINDOW_RECHARGE, + SYS_PUSH } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java index 1ba3b6ed..f106161c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/dto/CommonDelayDto.java @@ -10,6 +10,7 @@ public class CommonDelayDto { */ private Integer type; private Long forbidId; + private Long sysPushId; public CommonDelayTypeEnum getTypeEnum(){ CommonDelayTypeEnum[] values = CommonDelayTypeEnum.values(); diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/SysPushHandle.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/SysPushHandle.java new file mode 100644 index 00000000..a120e16c --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/SysPushHandle.java @@ -0,0 +1,31 @@ +package com.ruoyi.cai.mq.handle; + +import com.alibaba.fastjson2.JSON; +import com.ruoyi.cai.mq.dto.CommonConsumerEnum; +import com.ruoyi.cai.mq.handle.dto.SysPushDTO; +import com.ruoyi.cai.service.SysPushService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class SysPushHandle implements IHandle{ + @Autowired + private SysPushService sysPushService; + + @Override + public void run(String message) { + try { + SysPushDTO sysPush = JSON.parseObject(message, SysPushDTO.class); + sysPushService.doRun(sysPush.getSysPushId()); + }catch (Exception e){ + log.error("处理推送消息失败!",e); + } + } + + @Override + public CommonConsumerEnum getType() { + return CommonConsumerEnum.SYS_PUSH; + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/dto/SysPushDTO.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/dto/SysPushDTO.java new file mode 100644 index 00000000..ef5d03fe --- /dev/null +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/mq/handle/dto/SysPushDTO.java @@ -0,0 +1,14 @@ +package com.ruoyi.cai.mq.handle.dto; + +import com.ruoyi.cai.mq.dto.CommonConsumerEnum; +import com.ruoyi.cai.mq.dto.CommonDTO; +import lombok.Data; + +@Data +public class SysPushDTO extends CommonDTO { + private Long sysPushId; + + public SysPushDTO() { + this.setType(CommonConsumerEnum.SYS_PUSH); + } +} diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/SysPushService.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/SysPushService.java index 9d9d4421..9549b917 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/SysPushService.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/SysPushService.java @@ -3,6 +3,9 @@ package com.ruoyi.cai.service; import com.baomidou.mybatisplus.extension.service.IService; import com.ruoyi.cai.domain.SysPush; import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams; +import com.ruoyi.cai.dto.admin.vo.syspush.SendSysPushResp; + +import java.util.List; /** * 推送系统消息Service接口 @@ -13,4 +16,16 @@ import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams; public interface SysPushService extends IService { void saveSysPush(SysPushParams params); + + void fastStart(Long id); + + void handStart(Long id); + + void doRun(Long id); + + SendSysPushResp sendSysPush(SysPush sysPush, List userIds); + + void checkAll(); + + void closeSysPush(Long id); } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/SysPushServiceImpl.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/SysPushServiceImpl.java index 3c923f8f..59fca693 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/SysPushServiceImpl.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/SysPushServiceImpl.java @@ -2,18 +2,27 @@ package com.ruoyi.cai.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.google.common.collect.Lists; import com.ruoyi.cai.domain.SysPush; +import com.ruoyi.cai.domain.SysPushLog; import com.ruoyi.cai.domain.User; import com.ruoyi.cai.dto.admin.query.syspush.SysPushImageContextDTO; import com.ruoyi.cai.dto.admin.query.syspush.SysPushMasterDTO; import com.ruoyi.cai.dto.admin.query.syspush.SysPushParams; +import com.ruoyi.cai.dto.admin.vo.syspush.MinUserDTO; import com.ruoyi.cai.dto.admin.vo.syspush.SendSysPushResp; +import com.ruoyi.cai.enums.GenderEnum; import com.ruoyi.cai.enums.SystemConfigEnum; -import com.ruoyi.cai.enums.systempush.SystemPushStatusEnum; -import com.ruoyi.cai.enums.systempush.SystemPushTypeEnum; +import com.ruoyi.cai.enums.systempush.*; import com.ruoyi.cai.manager.SystemConfigManager; import com.ruoyi.cai.mapper.SysPushMapper; +import com.ruoyi.cai.mq.AmqpProducer; +import com.ruoyi.cai.mq.CommonDelayTypeEnum; +import com.ruoyi.cai.mq.dto.CommonDelayDto; +import com.ruoyi.cai.mq.handle.dto.SysPushDTO; import com.ruoyi.cai.notice.data.NoticeMsgR; import com.ruoyi.cai.notice.data.NoticeOnlyImageR; import com.ruoyi.cai.notice.data.NoticeR; @@ -24,17 +33,23 @@ import com.ruoyi.cai.notice.data.child.SimpleImageTextData; import com.ruoyi.cai.notice.enums.MessageBaseTypeEnum; import com.ruoyi.cai.service.SysPushService; import com.ruoyi.cai.service.UserService; +import com.ruoyi.cai.util.CaiDateUtil; import com.ruoyi.common.exception.ServiceException; import com.ruoyi.common.utils.BeanConvertUtil; import com.ruoyi.yunxin.Yunxin; import com.ruoyi.yunxin.enums.ImTypeEnum; import com.ruoyi.yunxin.resp.YxCommonR; import com.ruoyi.yunxin.resp.YxDataR; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; /** * 推送系统消息Service业务层处理 @@ -43,6 +58,7 @@ import java.util.List; * @date 2024-01-28 */ @Service +@Slf4j public class SysPushServiceImpl extends ServiceImpl implements SysPushService { @Autowired @@ -51,6 +67,8 @@ public class SysPushServiceImpl extends ServiceImpl impl private Yunxin yunxin; @Autowired private SystemConfigManager systemConfigManager; + @Autowired + private AmqpProducer amqpProducer; @Override public void saveSysPush(SysPushParams params) { @@ -60,19 +78,38 @@ public class SysPushServiceImpl extends ServiceImpl impl if(systemPushType == null){ throw new ServiceException("消息类型有误!"); } - User user = userService.getByUserCode(master.getUsercode()); - if(user == null){ - throw new ServiceException("蜜瓜号不存在!"); + SystemPushGroupIdEnum groupIdEnum = SystemPushGroupIdEnum.getByCode(master.getGroupId()); + if(groupIdEnum == null){ + throw new ServiceException("发送类型有误!"); } SysPush sysPush = new SysPush(); + if(groupIdEnum == SystemPushGroupIdEnum.ONE_USER){ + User user = userService.getByUserCode(master.getUsercode()); + if(user == null){ + throw new ServiceException("蜜瓜号不存在!"); + } + sysPush.setUserId(user.getId()); + sysPush.setUsercode(user.getUsercode()); + } + SystemPushSendTimeTypeEnum systemPushSendTime = SystemPushSendTimeTypeEnum.getByCode(master.getSendTimeType()); + if(systemPushSendTime == null){ + throw new ServiceException("发送方式有误!"); + } + sysPush.setSendTimeType(systemPushSendTime.getCode()); + if(systemPushSendTime == SystemPushSendTimeTypeEnum.SCHEDULED_SEND){ + long between = CaiDateUtil.diff(sysPush.getEndTime(), LocalDateTime.now()); + if(between < 0){ + throw new ServiceException("定时发送时间不能少于当前时间"); + } + if(between < 60*3){ + throw new ServiceException("定时发送最低限制为3分钟以后"); + } + sysPush.setSendTime(master.getSendTime()); + } sysPush.setGroupId(master.getGroupId()); sysPush.setType(master.getType()); - sysPush.setUserId(user.getId()); - sysPush.setUsercode(master.getUsercode()); sysPush.setTitle(master.getTitle()); sysPush.setRemark(master.getRemark()); - sysPush.setSendTimeType(master.getSendTimeType()); - sysPush.setSendTime(master.getSendTime()); sysPush.setStatus(SystemPushStatusEnum.NO_RUN.getCode()); String context = null; switch (systemPushType){ @@ -135,9 +172,146 @@ public class SysPushServiceImpl extends ServiceImpl impl sysPush.setContent(context); sysPush.setParams(JSON.toJSONString(params)); this.save(sysPush); + try { + // 小于 < 12小时 + long between = CaiDateUtil.diff(sysPush.getEndTime(), LocalDateTime.now()); + CommonDelayDto dto = new CommonDelayDto(); + dto.setSysPushId(sysPush.getId()); + dto.setType(CommonDelayTypeEnum.SYS_PUSH.getCode()); + amqpProducer.sendCommonDelayMq(dto,(int)between+2); + }catch (Exception e){ + log.error("延时任务发送失败!"); + } } + @Override + public void fastStart(Long id){ + boolean update = this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId, id) + .eq(SysPush::getStatus, SystemPushStatusEnum.NO_RUN.getCode()) + .set(SysPush::getStatus, SystemPushStatusEnum.RUNNING.getCode())); + if(!update){ + throw new ServiceException("正在执行中,请稍等"); + } + this.doRun(id); + } + + @Override + public void handStart(Long id){ + boolean update = this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId, id) + .eq(SysPush::getStatus, SystemPushStatusEnum.NO_RUN.getCode()) + .set(SysPush::getStatus, SystemPushStatusEnum.RUNNING.getCode())); + if(!update){ + throw new ServiceException("正在执行中,请稍等"); + } + SysPushDTO sysPush = new SysPushDTO(); + sysPush.setSysPushId(id); + amqpProducer.sendCommonMq(sysPush); + } + + @Override + public void doRun(Long id){ + SysPush sysPush = this.getById(id); + Integer status = sysPush.getStatus(); + if(SystemPushStatusEnum.COMPLETE.getCode().equals(status)){ + throw new ServiceException("不允许重复执行"); + } + if(SystemPushStatusEnum.RUNNING.getCode().equals(status) + || SystemPushStatusEnum.QUEUE_RUNNING.getCode().equals(status)){ + throw new ServiceException("任务正在执行中,不允许重复执行"); + } + boolean update = this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId, sysPush.getId()) + .eq(SysPush::getStatus, SystemPushStatusEnum.RUNNING.getCode()) + .set(SysPush::getStatus, SystemPushStatusEnum.COMPLETE.getCode())); + if(!update){ + throw new ServiceException("队列正在执行中"); + } + SystemPushGroupIdEnum groupId = SystemPushGroupIdEnum.getByCode(sysPush.getGroupId()); + if(groupId == null){ + this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId, sysPush.getId()) + .set(SysPush::getStatus,SystemPushStatusEnum.FAIL.getCode())); + log.error("未找到执行的人! sysPush={}",JSON.toJSONString(sysPush)); + return; + } + List minUserList = new ArrayList<>(); + switch (groupId) { + case ONE_USER: + User user = userService.getById(sysPush.getUserId()); + if(user != null){ + minUserList.add(new MinUserDTO(user.getId(),user.getUsercode(),user.getNickname())); + } + break; + case MEN_USER: + List menUserList = userService.list(Wrappers.lambdaQuery(User.class) + .select(User::getId, User::getUsercode, User::getNickname) + .eq(User::getGender, GenderEnum.MAN.getCode()) + .eq(User::getStatus, 0)); + minUserList = menUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList()); + break; + case WOMEN_USER: + List womenUserList = userService.list(Wrappers.lambdaQuery(User.class) + .select(User::getId, User::getUsercode, User::getNickname) + .eq(User::getGender, GenderEnum.WOMEN.getCode()) + .eq(User::getStatus, 0)); + minUserList = womenUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList()); + break; + case ANCHOR_USER: + List anchorUserList = userService.list(Wrappers.lambdaQuery(User.class) + .select(User::getId, User::getUsercode, User::getNickname) + .eq(User::getIsAnchor, 1) + .eq(User::getStatus, 0)); + minUserList = anchorUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList()); + break; + case ALL_USER: + List allUserList = userService.list(Wrappers.lambdaQuery(User.class) + .select(User::getId, User::getUsercode, User::getNickname) + .eq(User::getStatus, 0)); + minUserList = allUserList.stream().map(i -> new MinUserDTO(i.getId(), i.getUsercode(), i.getNickname())).collect(Collectors.toList()); + break; + default: + break; + } + if(minUserList.isEmpty()){ + this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId, sysPush.getId()) + .set(SysPush::getStatus,SystemPushStatusEnum.FAIL.getCode())); + log.error("未找到发送的人! sysPush={}",JSON.toJSONString(sysPush)); + return; + } + List> partition = Lists.partition(minUserList, 500); + for (List minUsers : partition) { + SysPushLog sysPushLog = new SysPushLog(); + sysPushLog.setSysPushId(sysPush.getId()); + sysPushLog.setUserJson(JSON.toJSONString(minUsers)); + sysPushLog.setNum(minUsers.size()); + try { + List userIds = minUsers.stream().map(MinUserDTO::getUserId).collect(Collectors.toList()); + SendSysPushResp resp = this.sendSysPush(sysPush, userIds); + YxDataR imResp = resp.getImResp(); + if(imResp != null && imResp.isSuccess()){ + sysPushLog.setStatus(SystemPushLogStatusEnum.SEND_SUCCESS.getCode()); + }else{ + sysPushLog.setStatus(SystemPushLogStatusEnum.SEND_SUCCESS.getCode()); + } + sysPushLog.setResult(JSON.toJSONString(imResp)); + }catch (Exception e){ + log.error("发送失败!",e); + sysPushLog.setStatus(SystemPushLogStatusEnum.SEND_FAIL.getCode()); + sysPushLog.setResult(e.getMessage()); + } + this.save(sysPush); + } + this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId, sysPush.getId()) + .set(SysPush::getStatus,SystemPushStatusEnum.COMPLETE.getCode())); + } + + + @Override public SendSysPushResp sendSysPush(SysPush sysPush, List userIds){ String cosUrl = systemConfigManager.getSystemConfig(SystemConfigEnum.COS_DOMAIN); Integer type = sysPush.getType(); @@ -196,4 +370,62 @@ public class SysPushServiceImpl extends ServiceImpl impl resp.setImResp(r); return resp; } + + + @Override + public void checkAll(){ + List expireIds = new ArrayList<>(); // 过期未执行的 + List expireConsumer = new ArrayList<>(); // 还有12小时过期的 + Page page = new Page<>(); + page.setSize(100L); + long current = 0; + while (true){ + current++; + page.setCurrent(current); + Page res = this.page(page,Wrappers.lambdaQuery(SysPush.class) + .eq(SysPush::getSendTimeType, SystemPushSendTimeTypeEnum.SCHEDULED_SEND.getCode()) + .eq(SysPush::getStatus,SystemPushStatusEnum.NO_RUN.getCode())); + List records = res.getRecords(); + if(records.isEmpty()){ + break; + } + for (SysPush record : records) { + if(record.getSendTime().isAfter(LocalDateTime.now())){ + expireIds.add(record.getId()); + }else if(record.getEndTime().plusHours(12).isAfter(LocalDateTime.now())){ // 还有12小时过期 + expireConsumer.add(record.getId()); + } + } + } + for (Long expireId : expireIds) { + try { + this.fastStart(expireId); + }catch (Exception e){ + // + } + + } + for (Long expireId : expireConsumer) { + SysPush sysPush = this.getById(expireId); + if(sysPush == null || !Objects.equals(sysPush.getStatus(), SystemPushStatusEnum.NO_RUN.getCode())){ + return; + } + // < 12小时 + long between = CaiDateUtil.diff(sysPush.getSendTime(),LocalDateTime.now()); + if(between > 0 && between < 60*60*12){ + CommonDelayDto dto = new CommonDelayDto(); + dto.setForbidId(expireId); + dto.setType(CommonDelayTypeEnum.SYS_PUSH.getCode()); + amqpProducer.sendCommonDelayMq(dto,(int)between+5); + } + } + } + + @Override + public void closeSysPush(Long id) { + this.update(Wrappers.lambdaUpdate(SysPush.class) + .eq(SysPush::getId,id) + .eq(SysPush::getStatus,SystemPushStatusEnum.NO_RUN.getCode()) + .set(SysPush::getStatus,SystemPushStatusEnum.COMPLETE.getCode())); + } } diff --git a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserForbidServiceImpl.java b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserForbidServiceImpl.java index 2cbfb9c0..7338325c 100644 --- a/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserForbidServiceImpl.java +++ b/ruoyi-cai/src/main/java/com/ruoyi/cai/service/impl/UserForbidServiceImpl.java @@ -16,10 +16,12 @@ import com.ruoyi.cai.enums.ForbidTypeEnum; import com.ruoyi.cai.manager.ForbidCache; import com.ruoyi.cai.mapper.UserForbidMapper; import com.ruoyi.cai.mq.AmqpProducer; +import com.ruoyi.cai.mq.CommonDelayTypeEnum; import com.ruoyi.cai.mq.dto.CommonDelayDto; import com.ruoyi.cai.service.AnchorService; import com.ruoyi.cai.service.UserForbidService; import com.ruoyi.cai.service.UserService; +import com.ruoyi.cai.util.CaiDateUtil; import com.ruoyi.common.core.domain.PageQuery; import com.ruoyi.common.exception.ServiceException; import lombok.extern.slf4j.Slf4j; @@ -83,11 +85,11 @@ public class UserForbidServiceImpl extends ServiceImpl