李进 1 год назад
Родитель
Сommit
277144ea23

+ 10 - 0
sys-api-feign/src/main/java/com/xy/feign/SysMqttConsumerFeign.java

@@ -0,0 +1,10 @@
+package com.xy.feign;
+
+import com.xy.FeignInterceptor;
+import com.xy.consts.ServiceConsts;
+import com.xy.service.SysMqttConsumer;
+import org.springframework.cloud.openfeign.FeignClient;
+
+@FeignClient(value = ServiceConsts.SERVICE_NAME, configuration = FeignInterceptor.class)
+public interface SysMqttConsumerFeign extends SysMqttConsumer {
+}

+ 5 - 0
sys-api-service/pom.xml

@@ -96,6 +96,11 @@
             <artifactId>xy-alipay</artifactId>
             <version>1.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>mqtt-api</artifactId>
+            <version>1.0</version>
+        </dependency>
     </dependencies>
 
     <build>

+ 0 - 55
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumerConfiguration.java

@@ -1,55 +0,0 @@
-package com.xy.consumer.log;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class LogConsumerConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "log";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 0 - 30
sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumer.java

@@ -1,30 +0,0 @@
-package com.xy.consumer.me;
-
-import cn.hutool.json.JSONUtil;
-import com.xy.annotate.MqttConsumerAsyn;
-import com.xy.config.SysThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
-import com.xy.entity.MqttConsumeErrorData;
-import com.xy.mapper.MqttConsumeErrorDataMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * mqtt消费失败数据消费者
- */
-@Slf4j
-@Service
-@RequiredArgsConstructor
-@MqttConsumerAsyn(value = SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, isFailSaveEs = false)
-public class MqttConsumeErrorDataConsumer implements MqttConsumer {
-
-    private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
-
-    @Override
-    public boolean message(String topic, String payload) {
-        MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
-        mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
-        return true;
-    }
-}

+ 0 - 55
sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumerConfiguration.java

@@ -1,55 +0,0 @@
-package com.xy.consumer.me;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class MqttConsumeErrorDataConsumerConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "mqttConsumeErrorData";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 16 - 16
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumer.java → sys-api-service/src/main/java/com/xy/service/SysMqttConsumerImpl.java

@@ -1,10 +1,9 @@
-package com.xy.consumer.log;
+package com.xy.service;
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
-import com.xy.annotate.MqttConsumerAsyn;
 import com.xy.config.SysThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
+import com.xy.dto.SysMqttDto;
 import com.xy.entity.LogOperate;
 import com.xy.entity.LogSysEvents;
 import com.xy.entity.LogTasks;
@@ -12,26 +11,28 @@ import com.xy.mapper.LogOperateMapper;
 import com.xy.mapper.LogSysEventsMapper;
 import com.xy.mapper.LogTasksMapper;
 import com.xy.utils.LambdaUtils;
-import lombok.RequiredArgsConstructor;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.AllArgsConstructor;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
-/**
- * 日志消费者
- */
 @Service
-@RequiredArgsConstructor
-@MqttConsumerAsyn(value = SysThreadPoolConfig.LOG_POLL, isFailSaveEs = false)
-public class LogConsumer implements MqttConsumer {
+@AllArgsConstructor
+@Api(tags = "系统消费者")
+public class SysMqttConsumerImpl implements SysMqttConsumer {
 
-    private final LogOperateMapper logOperateMapper;
+    private LogOperateMapper logOperateMapper;
 
-    private final LogSysEventsMapper logSysEventsMapper;
+    private LogSysEventsMapper logSysEventsMapper;
 
-    private final LogTasksMapper logTasksMapper;
+    private LogTasksMapper logTasksMapper;
 
     @Override
-    public boolean message(String topic, String payload) {
-        JSONObject jsonObject = JSONUtil.parseObj(payload);
+    @ApiOperation("日志")
+    @Async(SysThreadPoolConfig.LOG_POLL)
+    public void logs(SysMqttDto.RequestParams requestParams) {
+        JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
         String type = jsonObject.getStr(LambdaUtils.getProperty(LogOperate::getType));
         if ("operate".equals(type)) {
             logOperateMapper.insert(jsonObject.toBean(LogOperate.class));
@@ -42,6 +43,5 @@ public class LogConsumer implements MqttConsumer {
         if ("tasks".equals(type)) {
             logTasksMapper.insert(jsonObject.toBean(LogTasks.class));
         }
-        return true;
     }
 }

+ 25 - 0
sys-api/src/main/java/com/xy/dto/SysMqttDto.java

@@ -0,0 +1,25 @@
+package com.xy.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import javax.validation.constraints.NotBlank;
+
+public class SysMqttDto {
+
+    @Data
+    @Accessors(chain = true)
+    public static class RequestParams {
+
+        @NotBlank(message = "topic不能为空")
+        @ApiModelProperty(value = "topic", required = true)
+        private String topic;
+
+        @NotBlank(message = "data不能为空")
+        @ApiModelProperty(value = "消息内容", required = true)
+        private String data;
+
+    }
+
+}

+ 23 - 0
sys-api/src/main/java/com/xy/service/SysMqttConsumer.java

@@ -0,0 +1,23 @@
+package com.xy.service;
+
+import com.xy.annotate.RestMappingController;
+import com.xy.dto.SysMqttDto;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+/**
+ * 系统消费者 接口
+ */
+@RestMappingController("sys-mqtt-consumer")
+public interface SysMqttConsumer {
+
+    /**
+     * 日志
+     *
+     * @param requestParams
+     */
+    @PostMapping("logs")
+    void logs(@RequestBody @Validated SysMqttDto.RequestParams requestParams);
+
+}

+ 6 - 2
sys-start/pom.xml

@@ -35,17 +35,21 @@
         </dependency>
         <dependency>
             <groupId>com.xy</groupId>
-            <artifactId>xy-logback</artifactId>
+            <artifactId>mqtt-api-cloud</artifactId>
             <version>1.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>xy-logback</artifactId>
+            <version>1.0</version>
+        </dependency>
         <!-- torna swagger 插件 -->
         <dependency>
             <groupId>com.xy</groupId>
             <artifactId>xy-torna</artifactId>
             <version>1.0</version>
         </dependency>
-
         <!-- es -->
         <dependency>
             <groupId>org.elasticsearch.client</groupId>

+ 0 - 7
sys-start/src/main/resources/bootstrap-prod.yml

@@ -1,10 +1,3 @@
-spring:
-  mqtt:
-    topics:
-      - { topic: "msgNotify", handler: com.xy.consumer.MsgConsumer }
-      - { topic: "log", handler: com.xy.consumer.log.LogConsumer }
-      - { topic: "mqttConsumeErrorData", handler: com.xy.consumer.me.MqttConsumeErrorDataConsumer }
-
 #微服务相关配置
 cloud:
   center:

+ 0 - 7
sys-start/src/main/resources/bootstrap-uat.yml

@@ -1,10 +1,3 @@
-spring:
-  mqtt:
-    topics:
-      - { topic: "msgNotify", handler: com.xy.consumer.MsgConsumer }
-      - { topic: "log", handler: com.xy.consumer.log.LogConsumer }
-      - { topic: "mqttConsumeErrorData", handler: com.xy.consumer.me.MqttConsumeErrorDataConsumer }
-
 #微服务相关配置
 cloud:
   service:

+ 1 - 1
sys-start/src/main/resources/bootstrap.yml

@@ -10,7 +10,7 @@ cloud:
   center:
     url: 119.96.213.127:9007
     config:
-      shared-configs: redis.yaml,mysql.yaml,mqtt.yaml,xy-oss.yaml,xxl-job.yaml,es.yaml
+      shared-configs: redis.yaml,mysql.yaml,xy-oss.yaml,xxl-job.yaml,es.yaml
       name: sys
   service:
     name: dev-sys