package com.xy.consumer.me; import cn.hutool.json.JSONUtil; import com.xy.config.SysThreadPoolConfig; import com.xy.consumer.MqttConsumer; import com.xy.entity.MqttConsumeErrorData; import com.xy.mapper.MqttConsumeErrorDataMapper; import com.xy.utils.ThreadPoolUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; /** * mqtt消费失败数据消费者 */ @Slf4j @Service @RequiredArgsConstructor public class MqttConsumeErrorDataConsumer implements MqttConsumer { private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper; @Override public boolean message(String topic, String payload) { log.info("mqtt消费失败数据:{}", payload); ThreadPoolUtils.excPoll(SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, 1) .execute(() -> { MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class); mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData); }); return true; } }