package com.xy.job; import cn.easyes.core.conditions.LambdaEsQueryWrapper; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xy.collections.list.JArrayList; import com.xy.collections.list.JList; import com.xy.entity.MqttCommand; import com.xy.entity.SysDictRedis; import com.xy.mapper.MqttCommandMapper; import com.xy.utils.DataTime; import com.xy.utils.JobUtils; import com.xy.utils.SysDictUtils; import com.xy.utils.enums.DictEnum; import com.xy.utils.enums.MqttCommandStatusEnum; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; /** * 指令定时器 */ @Slf4j @Component @AllArgsConstructor public class MqttCommandJob { private MqttCommandMapper mqttCommandMapper; /** * 指令数据清除 * * @return */ @XxlJob("commandClean") public ReturnT commandClean() { //指令保留的天数 SysDictRedis sysDictRedis = SysDictUtils.get(DictEnum.DATA_CLEAR_SIZE.getKey(), "mqtt_command_day"); Integer jobParam = JobUtils.getPrams().gtInt(); Integer saveDay = jobParam != null ? jobParam : sysDictRedis != null ? Integer.valueOf(sysDictRedis.getValue()) : 30; LambdaEsQueryWrapper lambdaEsQueryWrapper = new LambdaEsQueryWrapper() .le(MqttCommand::getSendTime, DataTime.getStringAround(0, 0, (~(saveDay - 1)), 0, 0, 0)); Integer count = mqttCommandMapper.delete(lambdaEsQueryWrapper); log.info("已清除指令数据:{}条", count); return ReturnT.SUCCESS; } /** * 指令数据闭环 * * @return */ @XxlJob("commandDataCloud") public ReturnT commandDataCloud() { //过期时间小于当前时间的阈值,单位s SysDictRedis sysDictRedis = SysDictUtils.get(DictEnum.DATA_CLEAR_SIZE.getKey(), "mqtt_command_cloud"); Integer jobParam = JobUtils.getPrams().gtInt(); Integer threshold = jobParam != null ? jobParam : sysDictRedis != null ? Integer.valueOf(sysDictRedis.getValue()) : 30; LambdaEsQueryWrapper lambdaEsQueryWrapper = new LambdaEsQueryWrapper() .in(MqttCommand::getStatus, Arrays.asList(MqttCommandStatusEnum.COMMAND_SEND.getKey(), MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey())) .le(MqttCommand::getTimeout, DataTime.getStringAround(0, 0, 0, 0, 0, (~(threshold - 1)))); List mqttCommands = mqttCommandMapper.selectList(lambdaEsQueryWrapper); if (mqttCommands.size() == 0) { return ReturnT.SUCCESS; } JList list = new JArrayList<>(mqttCommands); //发送但没有回执的数据 JList sends = list.filter().eq(MqttCommand::getStatus, MqttCommandStatusEnum.COMMAND_SEND.getKey()).list(); sends.forEach(mqttCommand -> mqttCommand.setStatus(Integer.parseInt(MqttCommandStatusEnum.COMMAND_EXC_TIMEOUT.getKey()))); mqttCommandMapper.updateBatchByIds(sends); log.info("已处理发送但没有回执的指令数据:{}条", sends.size()); //发送失败的数据 JList sendFails = list.filter().eq(MqttCommand::getStatus, MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey()).list(); //todo 计数报表 return ReturnT.SUCCESS; } }