Selaa lähdekoodia

Merge branch 'mqtt' into test

李进 1 vuosi sitten
vanhempi
commit
64df8eb0cd

+ 10 - 0
sys-api-feign/src/main/java/com/xy/feign/SysMqttConsumerFeign.java

@@ -0,0 +1,10 @@
+package com.xy.feign;
+
+import com.xy.FeignInterceptor;
+import com.xy.consts.ServiceConsts;
+import com.xy.service.SysMqttConsumer;
+import org.springframework.cloud.openfeign.FeignClient;
+
+@FeignClient(value = ServiceConsts.SERVICE_NAME, configuration = FeignInterceptor.class)
+public interface SysMqttConsumerFeign extends SysMqttConsumer {
+}

+ 5 - 0
sys-api-service/pom.xml

@@ -96,6 +96,11 @@
             <artifactId>xy-alipay</artifactId>
             <version>1.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>mqtt-api</artifactId>
+            <version>1.0</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 0 - 55
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumerConfiguration.java

@@ -1,55 +0,0 @@
-package com.xy.consumer.log;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class LogConsumerConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "log";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 0 - 30
sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumer.java

@@ -1,30 +0,0 @@
-package com.xy.consumer.me;
-
-import cn.hutool.json.JSONUtil;
-import com.xy.annotate.MqttConsumerAsyn;
-import com.xy.config.SysThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
-import com.xy.entity.MqttConsumeErrorData;
-import com.xy.mapper.MqttConsumeErrorDataMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * mqtt消费失败数据消费者
- */
-@Slf4j
-@Service
-@RequiredArgsConstructor
-@MqttConsumerAsyn(value = SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, isFailSaveEs = false)
-public class MqttConsumeErrorDataConsumer implements MqttConsumer {
-
-    private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
-
-    @Override
-    public boolean message(String topic, String payload) {
-        MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
-        mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
-        return true;
-    }
-}

+ 0 - 55
sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumerConfiguration.java

@@ -1,55 +0,0 @@
-package com.xy.consumer.me;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class MqttConsumeErrorDataConsumerConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "mqttConsumeErrorData";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 16 - 16
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumer.java → sys-api-service/src/main/java/com/xy/service/SysMqttConsumerImpl.java

@@ -1,10 +1,9 @@
-package com.xy.consumer.log;
+package com.xy.service;
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
-import com.xy.annotate.MqttConsumerAsyn;
 import com.xy.config.SysThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
+import com.xy.dto.SysMqttDto;
 import com.xy.entity.LogOperate;
 import com.xy.entity.LogSysEvents;
 import com.xy.entity.LogTasks;
@@ -12,26 +11,28 @@ import com.xy.mapper.LogOperateMapper;
 import com.xy.mapper.LogSysEventsMapper;
 import com.xy.mapper.LogTasksMapper;
 import com.xy.utils.LambdaUtils;
-import lombok.RequiredArgsConstructor;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.AllArgsConstructor;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
-/**
- * 日志消费者
- */
 @Service
-@RequiredArgsConstructor
-@MqttConsumerAsyn(value = SysThreadPoolConfig.LOG_POLL, isFailSaveEs = false)
-public class LogConsumer implements MqttConsumer {
+@AllArgsConstructor
+@Api(tags = "系统消费者")
+public class SysMqttConsumerImpl implements SysMqttConsumer {
 
-    private final LogOperateMapper logOperateMapper;
+    private LogOperateMapper logOperateMapper;
 
-    private final LogSysEventsMapper logSysEventsMapper;
+    private LogSysEventsMapper logSysEventsMapper;
 
-    private final LogTasksMapper logTasksMapper;
+    private LogTasksMapper logTasksMapper;
 
     @Override
-    public boolean message(String topic, String payload) {
-        JSONObject jsonObject = JSONUtil.parseObj(payload);
+    @ApiOperation("日志")
+    @Async(SysThreadPoolConfig.LOG_POLL)
+    public void logs(SysMqttDto.RequestParams requestParams) {
+        JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
         String type = jsonObject.getStr(LambdaUtils.getProperty(LogOperate::getType));
         if ("operate".equals(type)) {
             logOperateMapper.insert(jsonObject.toBean(LogOperate.class));
@@ -42,6 +43,5 @@ public class LogConsumer implements MqttConsumer {
         if ("tasks".equals(type)) {
             logTasksMapper.insert(jsonObject.toBean(LogTasks.class));
         }
-        return true;
     }
 }

+ 25 - 0
sys-api/src/main/java/com/xy/dto/SysMqttDto.java

@@ -0,0 +1,25 @@
+package com.xy.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import javax.validation.constraints.NotBlank;
+
+public class SysMqttDto {
+
+    @Data
+    @Accessors(chain = true)
+    public static class RequestParams {
+
+        @NotBlank(message = "topic不能为空")
+        @ApiModelProperty(value = "topic", required = true)
+        private String topic;
+
+        @NotBlank(message = "data不能为空")
+        @ApiModelProperty(value = "消息内容", required = true)
+        private String data;
+
+    }
+
+}

+ 23 - 0
sys-api/src/main/java/com/xy/service/SysMqttConsumer.java

@@ -0,0 +1,23 @@
+package com.xy.service;
+
+import com.xy.annotate.RestMappingController;
+import com.xy.dto.SysMqttDto;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+/**
+ * 系统消费者 接口
+ */
+@RestMappingController("sys-mqtt-consumer")
+public interface SysMqttConsumer {
+
+    /**
+     * 日志
+     *
+     * @param requestParams
+     */
+    @PostMapping("logs")
+    void logs(@RequestBody @Validated SysMqttDto.RequestParams requestParams);
+
+}

+ 1 - 1
sys-sdk/pom.xml

@@ -30,7 +30,7 @@
         </dependency>
         <dependency>
             <groupId>com.xy</groupId>
-            <artifactId>xy-mqtt</artifactId>
+            <artifactId>mqtt-api</artifactId>
             <version>1.0</version>
         </dependency>
         <dependency>

+ 27 - 0
sys-sdk/src/main/java/com/xy/config/SpringConfigs.java

@@ -0,0 +1,27 @@
+package com.xy.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+@ConfigurationProperties(prefix = "spring")
+public class SpringConfigs {
+
+    private Profiles profiles;
+
+    private Application application;
+
+    @Data
+    public static class Profiles {
+
+        private String active;
+    }
+
+    @Data
+    public static class Application {
+
+        private String name;
+    }
+}

+ 0 - 26
sys-sdk/src/main/java/com/xy/producer/LogsProducer.java

@@ -1,26 +0,0 @@
-package com.xy.producer;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-/**
- * 生产者接口
- */
-@Component
-@MessagingGateway(defaultRequestChannel = LogsProducerConfiguration.CHANNEL_NAME_OUT)
-public interface LogsProducer {
-
-    /**
-     * ata是发送消息的内容
-     * topic是消息发送的主题,就是配置文件的主题
-     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
-     */
-
-    void sendToMqtt(String data);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
-}

+ 0 - 46
sys-sdk/src/main/java/com/xy/producer/LogsProducerConfiguration.java

@@ -1,46 +0,0 @@
-package com.xy.producer;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class LogsProducerConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "log";
-
-    /**
-     * 出站通道名(生产者)发布的bean名称
-     */
-    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
-
-
-    /*******************************生产者*******************************************/
-
-    /**
-     * MQTT信息通道(生产者)
-     */
-    @Bean(name = CHANNEL_NAME_OUT)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息处理器(生产者)
-     */
-    @Bean(name = TOPIC + "MqttOutbound")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler mqttOutbound() {
-        return MqttConfigUtils.mqttOutbound(TOPIC);
-    }
-
-}

+ 8 - 8
sys-sdk/src/main/java/com/xy/utils/LogUtils.java

@@ -2,10 +2,10 @@ package com.xy.utils;
 
 import cn.hutool.json.JSONObject;
 import com.xy.config.SpringConfigs;
+import com.xy.dto.MqttDto;
 import com.xy.entity.SysCodeConfigureRedis;
 import com.xy.enums.LogEnum;
-import com.xy.producer.LogsProducer;
-import com.xy.producer.LogsProducerConfiguration;
+import com.xy.service.SysMqttSendService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.web.context.request.RequestContextHolder;
@@ -19,7 +19,7 @@ public class LogUtils {
 
     private static SpringConfigs springConfigs;
 
-    private static LogsProducer logsProducer;
+    private static SysMqttSendService sysMqttSendService;
 
     @Autowired(required = false)
     public void setSpringConfigs(SpringConfigs springConfigs) {
@@ -27,8 +27,8 @@ public class LogUtils {
     }
 
     @Autowired(required = false)
-    public void setMqttProducer(LogsProducer logsProducer) {
-        LogUtils.logsProducer = logsProducer;
+    public void setSysMqttSendService(SysMqttSendService sysMqttSendService) {
+        LogUtils.sysMqttSendService = sysMqttSendService;
     }
 
     /**
@@ -64,7 +64,7 @@ public class LogUtils {
                 .set("clientIp", getIp())
                 .set("serviceName", springConfigs.getApplication().getName())
                 .set("type", "operate");
-        logsProducer.sendToMqtt(jsonObject.toString(), LogsProducerConfiguration.TOPIC, 0);
+        sysMqttSendService.log(new MqttDto.RequestParams().setQos(0).setData(jsonObject.toString()));
     }
 
     /**
@@ -86,7 +86,7 @@ public class LogUtils {
                 .set("serviceName", springConfigs.getApplication().getName())
                 .set("createTime", DataTime.getSring())
                 .set("type", "events");
-        logsProducer.sendToMqtt(jsonObject.toString(), LogsProducerConfiguration.TOPIC, 0);
+        sysMqttSendService.log(new MqttDto.RequestParams().setQos(0).setData(jsonObject.toString()));
     }
 
     /**
@@ -106,7 +106,7 @@ public class LogUtils {
                 .set("serviceName", springConfigs.getApplication().getName())
                 .set("createTime", DataTime.getSring())
                 .set("type", "tasks");
-        logsProducer.sendToMqtt(jsonObject.toString(), LogsProducerConfiguration.TOPIC, 0);
+        sysMqttSendService.log(new MqttDto.RequestParams().setQos(0).setData(jsonObject.toString()));
     }
 
     /**

+ 6 - 2
sys-start/pom.xml

@@ -35,17 +35,21 @@
         </dependency>
         <dependency>
             <groupId>com.xy</groupId>
-            <artifactId>xy-logback</artifactId>
+            <artifactId>mqtt-api-cloud</artifactId>
             <version>1.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>xy-logback</artifactId>
+            <version>1.0</version>
+        </dependency>
         <!-- torna swagger 插件 -->
         <dependency>
             <groupId>com.xy</groupId>
             <artifactId>xy-torna</artifactId>
             <version>1.0</version>
         </dependency>
-
         <!-- es -->
         <dependency>
             <groupId>org.elasticsearch.client</groupId>

+ 0 - 7
sys-start/src/main/resources/bootstrap-prod.yml

@@ -1,10 +1,3 @@
-spring:
-  mqtt:
-    topics:
-      - { topic: "msgNotify", handler: com.xy.consumer.MsgConsumer }
-      - { topic: "log", handler: com.xy.consumer.log.LogConsumer }
-      - { topic: "mqttConsumeErrorData", handler: com.xy.consumer.me.MqttConsumeErrorDataConsumer }
-
 #微服务相关配置
 cloud:
   center:

+ 0 - 7
sys-start/src/main/resources/bootstrap-uat.yml

@@ -1,10 +1,3 @@
-spring:
-  mqtt:
-    topics:
-      - { topic: "msgNotify", handler: com.xy.consumer.MsgConsumer }
-      - { topic: "log", handler: com.xy.consumer.log.LogConsumer }
-      - { topic: "mqttConsumeErrorData", handler: com.xy.consumer.me.MqttConsumeErrorDataConsumer }
-
 #微服务相关配置
 cloud:
   service:

+ 1 - 1
sys-start/src/main/resources/bootstrap.yml

@@ -10,7 +10,7 @@ cloud:
   center:
     url: 119.96.213.127:9007
     config:
-      shared-configs: redis.yaml,mysql.yaml,mqtt.yaml,xy-oss.yaml,xxl-job.yaml,es.yaml
+      shared-configs: redis.yaml,mysql.yaml,xy-oss.yaml,xxl-job.yaml,es.yaml
       name: sys
   service:
     name: dev-sys