MqttConsumeErrorDataConsumer.java 1.1 KB

123456789101112131415161718192021222324252627282930313233
  1. package com.xy.consumer.me;
  2. import cn.hutool.json.JSONUtil;
  3. import com.xy.config.SysThreadPoolConfig;
  4. import com.xy.consumer.MqttConsumer;
  5. import com.xy.entity.MqttConsumeErrorData;
  6. import com.xy.mapper.MqttConsumeErrorDataMapper;
  7. import com.xy.utils.ThreadPoolUtils;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.stereotype.Service;
  11. /**
  12. * mqtt消费失败数据消费者
  13. */
  14. @Slf4j
  15. @Service
  16. @RequiredArgsConstructor
  17. public class MqttConsumeErrorDataConsumer implements MqttConsumer {
  18. private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
  19. @Override
  20. public boolean message(String topic, String payload) {
  21. log.info("mqtt消费失败数据:{}", payload);
  22. ThreadPoolUtils.excPoll(SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, 1)
  23. .execute(() -> {
  24. MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
  25. mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
  26. });
  27. return true;
  28. }
  29. }