123456789101112131415161718192021222324252627282930313233 |
- package com.xy.consumer.me;
- import cn.hutool.json.JSONUtil;
- import com.xy.config.SysThreadPoolConfig;
- import com.xy.consumer.MqttConsumer;
- import com.xy.entity.MqttConsumeErrorData;
- import com.xy.mapper.MqttConsumeErrorDataMapper;
- import com.xy.utils.ThreadPoolUtils;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- /**
- * mqtt消费失败数据消费者
- */
- @Slf4j
- @Service
- @RequiredArgsConstructor
- public class MqttConsumeErrorDataConsumer implements MqttConsumer {
- private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
- @Override
- public boolean message(String topic, String payload) {
- log.info("mqtt消费失败数据:{}", payload);
- ThreadPoolUtils.excPoll(SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, 1)
- .execute(() -> {
- MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
- mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
- });
- return true;
- }
- }
|