Преглед на файлове

mqtt消费者失败放入es

李进 преди 1 година
родител
ревизия
86fd305e70

+ 0 - 26
sys-sdk/src/main/java/com/xy/producer/MqttConsumeErrorDataProducer.java

@@ -1,26 +0,0 @@
-package com.xy.producer;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-/**
- * 生产者接口
- */
-@Component
-@MessagingGateway(defaultRequestChannel = MqttConsumeErrorDataProducerConfiguration.CHANNEL_NAME_OUT)
-public interface MqttConsumeErrorDataProducer {
-
-    /**
-     * ata是发送消息的内容
-     * topic是消息发送的主题,就是配置文件的主题
-     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
-     */
-
-    void sendToMqtt(String data);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
-}

+ 0 - 46
sys-sdk/src/main/java/com/xy/producer/MqttConsumeErrorDataProducerConfiguration.java

@@ -1,46 +0,0 @@
-package com.xy.producer;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class MqttConsumeErrorDataProducerConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "mqttConsumeErrorData";
-
-    /**
-     * 出站通道名(生产者)发布的bean名称
-     */
-    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
-
-
-    /*******************************生产者*******************************************/
-
-    /**
-     * MQTT信息通道(生产者)
-     */
-    @Bean(name = CHANNEL_NAME_OUT)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息处理器(生产者)
-     */
-    @Bean(name = TOPIC + "MqttOutbound")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler mqttOutbound() {
-        return MqttConfigUtils.mqttOutbound(TOPIC);
-    }
-
-}

+ 0 - 30
sys-sdk/src/main/java/com/xy/utils/MqttConsumeErrorDataUtils.java

@@ -1,30 +0,0 @@
-package com.xy.utils;
-
-import cn.hutool.json.JSONObject;
-import com.xy.producer.LogsProducerConfiguration;
-import com.xy.producer.MqttConsumeErrorDataProducer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class MqttConsumeErrorDataUtils {
-
-    private static MqttConsumeErrorDataProducer mqttConsumeErrorDataProducer;
-
-    @Autowired(required = false)
-    public void setMqttProducer(MqttConsumeErrorDataProducer mqttConsumeErrorDataProducer) {
-        MqttConsumeErrorDataUtils.mqttConsumeErrorDataProducer = mqttConsumeErrorDataProducer;
-    }
-
-    /**
-     * 添加异常
-     */
-    public static void save(String topic, String msg) {
-        JSONObject jsonObject = new JSONObject()
-                .set("topic", topic)
-                .set("msg", msg)
-                .set("createTime", DataTime.getSring());
-        mqttConsumeErrorDataProducer.sendToMqtt(jsonObject.toString(), LogsProducerConfiguration.TOPIC, 0);
-    }
-
-}