MqttCommandJob.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package com.xy.job;
  2. import cn.easyes.core.conditions.LambdaEsQueryWrapper;
  3. import com.xxl.job.core.biz.model.ReturnT;
  4. import com.xxl.job.core.handler.annotation.XxlJob;
  5. import com.xy.collections.list.JArrayList;
  6. import com.xy.collections.list.JList;
  7. import com.xy.entity.MqttCommand;
  8. import com.xy.entity.SysDictRedis;
  9. import com.xy.mapper.MqttCommandMapper;
  10. import com.xy.utils.DataTime;
  11. import com.xy.utils.JobUtils;
  12. import com.xy.utils.SysDictUtils;
  13. import com.xy.utils.enums.DictEnum;
  14. import com.xy.utils.enums.MqttCommandStatusEnum;
  15. import lombok.AllArgsConstructor;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.springframework.stereotype.Component;
  18. import java.util.Arrays;
  19. import java.util.List;
  20. /**
  21. * 指令定时器
  22. */
  23. @Slf4j
  24. @Component
  25. @AllArgsConstructor
  26. public class MqttCommandJob {
  27. private MqttCommandMapper mqttCommandMapper;
  28. /**
  29. * 指令数据清除
  30. *
  31. * @return
  32. */
  33. @XxlJob("commandClean")
  34. public ReturnT<String> commandClean() {
  35. //指令保留的天数
  36. SysDictRedis sysDictRedis = SysDictUtils.get(DictEnum.DATA_CLEAR_SIZE.getKey(), "mqtt_command_day");
  37. Integer jobParam = JobUtils.getPrams().gtInt();
  38. Integer saveDay = jobParam != null ? jobParam
  39. : sysDictRedis != null ? Integer.valueOf(sysDictRedis.getValue())
  40. : 30;
  41. LambdaEsQueryWrapper<MqttCommand> lambdaEsQueryWrapper = new LambdaEsQueryWrapper<MqttCommand>()
  42. .le(MqttCommand::getSendTime, DataTime.getStringAround(0, 0, (~(saveDay - 1)), 0, 0, 0));
  43. Integer count = mqttCommandMapper.delete(lambdaEsQueryWrapper);
  44. log.info("已清除指令数据:{}条", count);
  45. return ReturnT.SUCCESS;
  46. }
  47. /**
  48. * 指令数据闭环
  49. *
  50. * @return
  51. */
  52. @XxlJob("commandDataCloud")
  53. public ReturnT<String> commandDataCloud() {
  54. //过期时间小于当前时间的阈值,单位s
  55. SysDictRedis sysDictRedis = SysDictUtils.get(DictEnum.DATA_CLEAR_SIZE.getKey(), "mqtt_command_cloud");
  56. Integer jobParam = JobUtils.getPrams().gtInt();
  57. Integer threshold = jobParam != null ? jobParam
  58. : sysDictRedis != null ? Integer.valueOf(sysDictRedis.getValue())
  59. : 30;
  60. LambdaEsQueryWrapper<MqttCommand> lambdaEsQueryWrapper = new LambdaEsQueryWrapper<MqttCommand>()
  61. .in(MqttCommand::getStatus, Arrays.asList(MqttCommandStatusEnum.COMMAND_SEND.getKey(), MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey()))
  62. .le(MqttCommand::getTimeout, DataTime.getStringAround(0, 0, 0, 0, 0, (~(threshold - 1))));
  63. List<MqttCommand> mqttCommands = mqttCommandMapper.selectList(lambdaEsQueryWrapper);
  64. if (mqttCommands.size() == 0) {
  65. return ReturnT.SUCCESS;
  66. }
  67. JList<MqttCommand> list = new JArrayList<>(mqttCommands);
  68. //发送但没有回执的数据
  69. JList<MqttCommand> sends = list.filter().eq(MqttCommand::getStatus, MqttCommandStatusEnum.COMMAND_SEND.getKey()).list();
  70. sends.forEach(mqttCommand -> mqttCommand.setStatus(Integer.parseInt(MqttCommandStatusEnum.COMMAND_EXC_TIMEOUT.getKey())));
  71. mqttCommandMapper.updateBatchByIds(sends);
  72. log.info("已处理发送但没有回执的指令数据:{}条", sends.size());
  73. //发送失败的数据
  74. JList<MqttCommand> sendFails = list.filter().eq(MqttCommand::getStatus, MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey()).list();
  75. //todo 计数报表
  76. return ReturnT.SUCCESS;
  77. }
  78. }