123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package com.xy.job;
- import cn.easyes.core.conditions.LambdaEsQueryWrapper;
- import com.xxl.job.core.biz.model.ReturnT;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import com.xy.collections.list.JArrayList;
- import com.xy.collections.list.JList;
- import com.xy.entity.MqttCommand;
- import com.xy.entity.SysDictRedis;
- import com.xy.mapper.MqttCommandMapper;
- import com.xy.utils.DataTime;
- import com.xy.utils.JobUtils;
- import com.xy.utils.SysDictUtils;
- import com.xy.utils.enums.DictEnum;
- import com.xy.utils.enums.MqttCommandStatusEnum;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import java.util.Arrays;
- import java.util.List;
- /**
- * 指令定时器
- */
- @Slf4j
- @Component
- @AllArgsConstructor
- public class MqttCommandJob {
- private MqttCommandMapper mqttCommandMapper;
- /**
- * 指令数据清除
- *
- * @return
- */
- @XxlJob("commandClean")
- public ReturnT<String> commandClean() {
- //指令保留的天数
- SysDictRedis sysDictRedis = SysDictUtils.get(DictEnum.DATA_CLEAR_SIZE.getKey(), "mqtt_command_day");
- Integer jobParam = JobUtils.getPrams().gtInt();
- Integer saveDay = jobParam != null ? jobParam
- : sysDictRedis != null ? Integer.valueOf(sysDictRedis.getValue())
- : 30;
- LambdaEsQueryWrapper<MqttCommand> lambdaEsQueryWrapper = new LambdaEsQueryWrapper<MqttCommand>()
- .le(MqttCommand::getSendTime, DataTime.getStringAround(0, 0, (~(saveDay - 1)), 0, 0, 0));
- Integer count = mqttCommandMapper.delete(lambdaEsQueryWrapper);
- log.info("已清除指令数据:{}条", count);
- return ReturnT.SUCCESS;
- }
- /**
- * 指令数据闭环
- *
- * @return
- */
- @XxlJob("commandDataCloud")
- public ReturnT<String> commandDataCloud() {
- //过期时间小于当前时间的阈值,单位s
- SysDictRedis sysDictRedis = SysDictUtils.get(DictEnum.DATA_CLEAR_SIZE.getKey(), "mqtt_command_cloud");
- Integer jobParam = JobUtils.getPrams().gtInt();
- Integer threshold = jobParam != null ? jobParam
- : sysDictRedis != null ? Integer.valueOf(sysDictRedis.getValue())
- : 30;
- LambdaEsQueryWrapper<MqttCommand> lambdaEsQueryWrapper = new LambdaEsQueryWrapper<MqttCommand>()
- .in(MqttCommand::getStatus, Arrays.asList(MqttCommandStatusEnum.COMMAND_SEND.getKey(), MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey()))
- .le(MqttCommand::getTimeout, DataTime.getStringAround(0, 0, 0, 0, 0, (~(threshold - 1))));
- List<MqttCommand> mqttCommands = mqttCommandMapper.selectList(lambdaEsQueryWrapper);
- if (mqttCommands.size() == 0) {
- return ReturnT.SUCCESS;
- }
- JList<MqttCommand> list = new JArrayList<>(mqttCommands);
- //发送但没有回执的数据
- JList<MqttCommand> sends = list.filter().eq(MqttCommand::getStatus, MqttCommandStatusEnum.COMMAND_SEND.getKey()).list();
- sends.forEach(mqttCommand -> mqttCommand.setStatus(Integer.parseInt(MqttCommandStatusEnum.COMMAND_EXC_TIMEOUT.getKey())));
- mqttCommandMapper.updateBatchByIds(sends);
- log.info("已处理发送但没有回执的指令数据:{}条", sends.size());
- //发送失败的数据
- JList<MqttCommand> sendFails = list.filter().eq(MqttCommand::getStatus, MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey()).list();
- //todo 计数报表
- return ReturnT.SUCCESS;
- }
- }
|