Explorar el Código

mqtt消费者失败放入es

李进 hace 1 año
padre
commit
da53aa7f5c

+ 20 - 0
sys-api-service/src/main/java/com/xy/config/SysThreadPoolConfig.java

@@ -11,6 +11,8 @@ public class SysThreadPoolConfig {
 
     public static final String LOG_POLL = "logPoll";
 
+    public static final String MQTT_CONSUME_ERROR_DATA = "mqttConsumeErrorData";
+
     /**
      * 日志线程池
      */
@@ -26,4 +28,22 @@ public class SysThreadPoolConfig {
                 .queueSize(coreSize * 10)
                 .builder();
     }
+
+    /**
+     * mqtt消费失败数据线程池
+     *
+     * @return
+     */
+    @DynamicTp
+    @Bean(MQTT_CONSUME_ERROR_DATA)
+    public ThreadPoolTaskExecutor mqttConsumeErrorData() {
+        int coreSize = 2;
+        return ThreadPoolUtils.newPoll()
+                .name(MQTT_CONSUME_ERROR_DATA)
+                .coreSize(coreSize)
+                .maxSize(coreSize * 2)
+                .keepAlive(60)
+                .queueSize(coreSize * 10)
+                .builder();
+    }
 }

+ 0 - 48
sys-api-service/src/main/java/com/xy/consumer/LogConsumer.java

@@ -1,48 +0,0 @@
-package com.xy.consumer;
-
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import com.xy.config.SysThreadPoolConfig;
-import com.xy.entity.LogOperate;
-import com.xy.entity.LogSysEvents;
-import com.xy.entity.LogTasks;
-import com.xy.mapper.LogOperateMapper;
-import com.xy.mapper.LogSysEventsMapper;
-import com.xy.mapper.LogTasksMapper;
-import com.xy.utils.LambdaUtils;
-import com.xy.utils.ThreadPoolUtils;
-import lombok.RequiredArgsConstructor;
-import org.springframework.stereotype.Service;
-
-/**
- * 日志消费者
- */
-@Service
-@RequiredArgsConstructor
-public class LogConsumer implements MqttConsumer {
-
-    private final LogOperateMapper logOperateMapper;
-
-    private final LogSysEventsMapper logSysEventsMapper;
-
-    private final LogTasksMapper logTasksMapper;
-
-    @Override
-    public boolean message(String topic, String payload) {
-        ThreadPoolUtils.excPoll(SysThreadPoolConfig.LOG_POLL, 1)
-                .execute(() -> {
-                    JSONObject jsonObject = JSONUtil.parseObj(payload);
-                    String type = jsonObject.getStr(LambdaUtils.getProperty(LogOperate::getType));
-                    if ("operate".equals(type)) {
-                        logOperateMapper.insert(jsonObject.toBean(LogOperate.class));
-                    }
-                    if ("events".equals(type)) {
-                        logSysEventsMapper.insert(jsonObject.toBean(LogSysEvents.class));
-                    }
-                    if ("tasks".equals(type)) {
-                        logTasksMapper.insert(jsonObject.toBean(LogTasks.class));
-                    }
-                });
-        return true;
-    }
-}

+ 33 - 0
sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumer.java

@@ -0,0 +1,33 @@
+package com.xy.consumer.me;
+
+import cn.hutool.json.JSONUtil;
+import com.xy.config.SysThreadPoolConfig;
+import com.xy.consumer.MqttConsumer;
+import com.xy.entity.MqttConsumeErrorData;
+import com.xy.mapper.MqttConsumeErrorDataMapper;
+import com.xy.utils.ThreadPoolUtils;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * mqtt消费失败数据消费者
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class MqttConsumeErrorDataConsumer implements MqttConsumer {
+
+    private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
+
+    @Override
+    public boolean message(String topic, String payload) {
+        log.info("mqtt消费失败数据:{}", payload);
+        ThreadPoolUtils.excPoll(SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, 1)
+                .execute(() -> {
+                    MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
+                    mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
+                });
+        return true;
+    }
+}

+ 3 - 3
sys-api-service/src/main/java/com/xy/consumer/LogConsumerConfiguration.java → sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumerConfiguration.java

@@ -1,4 +1,4 @@
-package com.xy.consumer;
+package com.xy.consumer.me;
 
 import com.xy.configuration.MqttConfigUtils;
 import org.springframework.context.annotation.Bean;
@@ -12,12 +12,12 @@ import org.springframework.messaging.MessageHandler;
 
 @Configuration
 @IntegrationComponentScan
-public class LogConsumerConfiguration {
+public class MqttConsumeErrorDataConsumerConfiguration {
 
     /**
      * topic
      */
-    public final static String TOPIC = "log";
+    public final static String TOPIC = "mqttConsumeErrorData";
 
     /**
      * 入站通道名(消费者)订阅的bean名称

+ 34 - 0
sys-api-service/src/main/java/com/xy/entity/MqttConsumeErrorData.java

@@ -0,0 +1,34 @@
+package com.xy.entity;
+
+import cn.easyes.annotation.IndexId;
+import cn.easyes.annotation.IndexName;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+/**
+ * mqtt消费失败数据
+ */
+@Data
+@Accessors(chain = true)
+@IndexName("mqtt_error_data")
+public class MqttConsumeErrorData {
+
+    @IndexId
+    private String id;
+
+    /**
+     * 分组
+     */
+    private String group;
+
+    /**
+     * 内容
+     */
+    private String msg;
+
+    /**
+     * 创建时间
+     */
+    private String createTime;
+
+}

+ 7 - 0
sys-api-service/src/main/java/com/xy/mapper/MqttConsumeErrorDataMapper.java

@@ -0,0 +1,7 @@
+package com.xy.mapper;
+
+import cn.easyes.core.conditions.interfaces.BaseEsMapper;
+import com.xy.entity.MqttConsumeErrorData;
+
+public interface MqttConsumeErrorDataMapper extends BaseEsMapper<MqttConsumeErrorData> {
+}

+ 41 - 0
sys-api-service/src/main/java/com/xy/service/MqttConsumeErrorDataServiceImpl.java

@@ -0,0 +1,41 @@
+package com.xy.service;
+
+import cn.easyes.core.biz.EsPageInfo;
+import cn.easyes.core.conditions.LambdaEsQueryWrapper;
+import com.xy.dto.MqttConsumeErrorDataDto;
+import com.xy.entity.MqttConsumeErrorData;
+import com.xy.mapper.MqttConsumeErrorDataMapper;
+import com.xy.utils.EsQueryWrapper;
+import com.xy.utils.PageBean;
+import com.xy.utils.R;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.AllArgsConstructor;
+import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+import static com.xy.utils.EsBeans.toPageBean;
+
+@Service
+@AllArgsConstructor
+@Api(tags = "mqtt消费失败数据")
+public class MqttConsumeErrorDataServiceImpl implements MqttConsumeErrorDataService {
+
+    private MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
+
+    @PostMapping("page")
+    @ApiOperation("分页查询")
+    public R<PageBean<MqttConsumeErrorDataDto.Vo>> page(@RequestBody MqttConsumeErrorDataDto.Page page) {
+        PageBean pageBean = page.getPage();
+        LambdaEsQueryWrapper<MqttConsumeErrorData> lambdaEsQueryWrapper = new EsQueryWrapper().eqWrapper(page, MqttConsumeErrorData.class)
+                .ge(MqttConsumeErrorData::getCreateTime, page.getBeginCreateTime())
+                .le(MqttConsumeErrorData::getCreateTime, page.getEndCreateTime())
+                .like(MqttConsumeErrorData::getMsg)
+                .build()
+                .orderByDesc(MqttConsumeErrorData::getCreateTime);
+        EsPageInfo<MqttConsumeErrorData> esPageInfo = mqttConsumeErrorDataMapper.pageQuery(lambdaEsQueryWrapper, (int) pageBean.getCurrent(), (int) pageBean.getSize());
+        return R.ok(toPageBean(MqttConsumeErrorDataDto.Vo.class, esPageInfo));
+    }
+
+}

+ 58 - 0
sys-api/src/main/java/com/xy/dto/MqttConsumeErrorDataDto.java

@@ -0,0 +1,58 @@
+package com.xy.dto;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.xy.utils.PageBean;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+public class MqttConsumeErrorDataDto {
+
+    @Data
+    @Accessors(chain = true)
+    public static class Page extends Vo {
+
+        private PageBean page;
+
+        @JsonFormat(pattern = "yyyy-MM-dd")
+        @ApiModelProperty(value = "创建时间-起")
+        private LocalDate beginCreateTime;
+
+        @JsonFormat(pattern = "yyyy-MM-dd")
+        @ApiModelProperty(value = "创建时间-始")
+        private LocalDate endCreateTime;
+
+        public LocalDateTime getBeginCreateTime() {
+            return beginCreateTime == null ? null : beginCreateTime.atTime(0, 0, 0);
+        }
+
+        public LocalDateTime getEndCreateTime() {
+            return endCreateTime == null ? null : endCreateTime.atTime(23, 59, 59);
+        }
+    }
+
+    @Data
+    @Accessors(chain = true)
+    public static class Vo {
+
+        private String id;
+
+        /**
+         * 分组
+         */
+        private String group;
+
+        /**
+         * 内容
+         */
+        private String msg;
+
+        /**
+         * 创建时间
+         */
+        private String createTime;
+    }
+}

+ 10 - 0
sys-api/src/main/java/com/xy/service/MqttConsumeErrorDataService.java

@@ -0,0 +1,10 @@
+package com.xy.service;
+
+import com.xy.annotate.RestMappingController;
+
+/**
+ * mqtt消费失败数据
+ */
+@RestMappingController("mqtt-consume-error-data")
+public interface MqttConsumeErrorDataService {
+}

+ 26 - 0
sys-sdk/src/main/java/com/xy/producer/MqttConsumeErrorDataProducer.java

@@ -0,0 +1,26 @@
+package com.xy.producer;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * 生产者接口
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = MqttConsumeErrorDataProducerConfiguration.CHANNEL_NAME_OUT)
+public interface MqttConsumeErrorDataProducer {
+
+    /**
+     * ata是发送消息的内容
+     * topic是消息发送的主题,就是配置文件的主题
+     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
+     */
+
+    void sendToMqtt(String data);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
+}

+ 46 - 0
sys-sdk/src/main/java/com/xy/producer/MqttConsumeErrorDataProducerConfiguration.java

@@ -0,0 +1,46 @@
+package com.xy.producer;
+
+import com.xy.configuration.MqttConfigUtils;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+@Configuration
+@IntegrationComponentScan
+public class MqttConsumeErrorDataProducerConfiguration {
+
+    /**
+     * topic
+     */
+    public final static String TOPIC = "mqttConsumeErrorData";
+
+    /**
+     * 出站通道名(生产者)发布的bean名称
+     */
+    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
+
+
+    /*******************************生产者*******************************************/
+
+    /**
+     * MQTT信息通道(生产者)
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息处理器(生产者)
+     */
+    @Bean(name = TOPIC + "MqttOutbound")
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler mqttOutbound() {
+        return MqttConfigUtils.mqttOutbound(TOPIC);
+    }
+
+}

+ 30 - 0
sys-sdk/src/main/java/com/xy/utils/MqttConsumeErrorDataUtils.java

@@ -0,0 +1,30 @@
+package com.xy.utils;
+
+import cn.hutool.json.JSONObject;
+import com.xy.producer.LogsProducerConfiguration;
+import com.xy.producer.MqttConsumeErrorDataProducer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MqttConsumeErrorDataUtils {
+
+    private static MqttConsumeErrorDataProducer mqttConsumeErrorDataProducer;
+
+    @Autowired(required = false)
+    public void setMqttProducer(MqttConsumeErrorDataProducer mqttConsumeErrorDataProducer) {
+        MqttConsumeErrorDataUtils.mqttConsumeErrorDataProducer = mqttConsumeErrorDataProducer;
+    }
+
+    /**
+     * 添加异常
+     */
+    public static void save(String group, String msg) {
+        JSONObject jsonObject = new JSONObject()
+                .set("group", group)
+                .set("msg", msg)
+                .set("createTime", DataTime.getSring());
+        mqttConsumeErrorDataProducer.sendToMqtt(jsonObject.toString(), LogsProducerConfiguration.TOPIC, 0);
+    }
+
+}

+ 3 - 2
sys-start/src/main/resources/bootstrap-prod.yml

@@ -1,8 +1,9 @@
 spring:
   mqtt:
     topics:
-      - { topic: "msgNotify", handler: com.xy.consumer.MsgConsumer }
-      - { topic: "log", handler: com.xy.consumer.LogConsumer }
+      - { topic: "msgNotify", handler: com.xy.consumer.log.MsgConsumer }
+      - { topic: "log", handler: com.xy.consumer.log.LogConsumer }
+      - { topic: "mqttConsumeErrorData", handler: com.xy.consumer.me.MqttConsumeErrorDataConsumer }
 
 #微服务相关配置
 cloud:

+ 3 - 2
sys-start/src/main/resources/bootstrap-uat.yml

@@ -1,8 +1,9 @@
 spring:
   mqtt:
     topics:
-      - { topic: "msgNotify", handler: com.xy.consumer.MsgConsumer }
-      - { topic: "log", handler: com.xy.consumer.LogConsumer }
+      - { topic: "msgNotify", handler: com.xy.consumer.log.MsgConsumer }
+      - { topic: "log", handler: com.xy.consumer.log.LogConsumer }
+      - { topic: "mqttConsumeErrorData", handler: com.xy.consumer.me.MqttConsumeErrorDataConsumer }
 
 #微服务相关配置
 cloud: