Browse Source

Merge branch 'master' into test

李进 1 year ago
parent
commit
0e651fb048

+ 13 - 15
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumer.java

@@ -2,6 +2,7 @@ package com.xy.consumer.log;
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
+import com.xy.annotate.MqttConsumerAsyn;
 import com.xy.config.SysThreadPoolConfig;
 import com.xy.consumer.MqttConsumer;
 import com.xy.entity.LogOperate;
@@ -11,7 +12,6 @@ import com.xy.mapper.LogOperateMapper;
 import com.xy.mapper.LogSysEventsMapper;
 import com.xy.mapper.LogTasksMapper;
 import com.xy.utils.LambdaUtils;
-import com.xy.utils.ThreadPoolUtils;
 import lombok.RequiredArgsConstructor;
 import org.springframework.stereotype.Service;
 
@@ -20,6 +20,7 @@ import org.springframework.stereotype.Service;
  */
 @Service
 @RequiredArgsConstructor
+@MqttConsumerAsyn(value = SysThreadPoolConfig.LOG_POLL, isFailSaveEs = false)
 public class LogConsumer implements MqttConsumer {
 
     private final LogOperateMapper logOperateMapper;
@@ -30,20 +31,17 @@ public class LogConsumer implements MqttConsumer {
 
     @Override
     public boolean message(String topic, String payload) {
-        ThreadPoolUtils.excPoll(SysThreadPoolConfig.LOG_POLL, 1)
-                .execute(() -> {
-                    JSONObject jsonObject = JSONUtil.parseObj(payload);
-                    String type = jsonObject.getStr(LambdaUtils.getProperty(LogOperate::getType));
-                    if ("operate".equals(type)) {
-                        logOperateMapper.insert(jsonObject.toBean(LogOperate.class));
-                    }
-                    if ("events".equals(type)) {
-                        logSysEventsMapper.insert(jsonObject.toBean(LogSysEvents.class));
-                    }
-                    if ("tasks".equals(type)) {
-                        logTasksMapper.insert(jsonObject.toBean(LogTasks.class));
-                    }
-                });
+        JSONObject jsonObject = JSONUtil.parseObj(payload);
+        String type = jsonObject.getStr(LambdaUtils.getProperty(LogOperate::getType));
+        if ("operate".equals(type)) {
+            logOperateMapper.insert(jsonObject.toBean(LogOperate.class));
+        }
+        if ("events".equals(type)) {
+            logSysEventsMapper.insert(jsonObject.toBean(LogSysEvents.class));
+        }
+        if ("tasks".equals(type)) {
+            logTasksMapper.insert(jsonObject.toBean(LogTasks.class));
+        }
         return true;
     }
 }

+ 4 - 7
sys-api-service/src/main/java/com/xy/consumer/me/MqttConsumeErrorDataConsumer.java

@@ -1,11 +1,11 @@
 package com.xy.consumer.me;
 
 import cn.hutool.json.JSONUtil;
+import com.xy.annotate.MqttConsumerAsyn;
 import com.xy.config.SysThreadPoolConfig;
 import com.xy.consumer.MqttConsumer;
 import com.xy.entity.MqttConsumeErrorData;
 import com.xy.mapper.MqttConsumeErrorDataMapper;
-import com.xy.utils.ThreadPoolUtils;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -16,18 +16,15 @@ import org.springframework.stereotype.Service;
 @Slf4j
 @Service
 @RequiredArgsConstructor
+@MqttConsumerAsyn(value = SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, isFailSaveEs = false)
 public class MqttConsumeErrorDataConsumer implements MqttConsumer {
 
     private final MqttConsumeErrorDataMapper mqttConsumeErrorDataMapper;
 
     @Override
     public boolean message(String topic, String payload) {
-        log.info("mqtt消费失败数据:{}", payload);
-        ThreadPoolUtils.excPoll(SysThreadPoolConfig.MQTT_CONSUME_ERROR_DATA, 1)
-                .execute(() -> {
-                    MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
-                    mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
-                });
+        MqttConsumeErrorData mqttConsumeErrorData = JSONUtil.parseObj(payload).toBean(MqttConsumeErrorData.class);
+        mqttConsumeErrorDataMapper.insert(mqttConsumeErrorData);
         return true;
     }
 }