Parcourir la source

mqtt消费者失败放入es

李进 il y a 1 an
Parent
commit
be698de664

+ 49 - 0
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumer.java

@@ -0,0 +1,49 @@
+package com.xy.consumer.log;
+
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.xy.config.SysThreadPoolConfig;
+import com.xy.consumer.MqttConsumer;
+import com.xy.entity.LogOperate;
+import com.xy.entity.LogSysEvents;
+import com.xy.entity.LogTasks;
+import com.xy.mapper.LogOperateMapper;
+import com.xy.mapper.LogSysEventsMapper;
+import com.xy.mapper.LogTasksMapper;
+import com.xy.utils.LambdaUtils;
+import com.xy.utils.ThreadPoolUtils;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+
+/**
+ * 日志消费者
+ */
+@Service
+@RequiredArgsConstructor
+public class LogConsumer implements MqttConsumer {
+
+    private final LogOperateMapper logOperateMapper;
+
+    private final LogSysEventsMapper logSysEventsMapper;
+
+    private final LogTasksMapper logTasksMapper;
+
+    @Override
+    public boolean message(String topic, String payload) {
+        ThreadPoolUtils.excPoll(SysThreadPoolConfig.LOG_POLL, 1)
+                .execute(() -> {
+                    JSONObject jsonObject = JSONUtil.parseObj(payload);
+                    String type = jsonObject.getStr(LambdaUtils.getProperty(LogOperate::getType));
+                    if ("operate".equals(type)) {
+                        logOperateMapper.insert(jsonObject.toBean(LogOperate.class));
+                    }
+                    if ("events".equals(type)) {
+                        logSysEventsMapper.insert(jsonObject.toBean(LogSysEvents.class));
+                    }
+                    if ("tasks".equals(type)) {
+                        logTasksMapper.insert(jsonObject.toBean(LogTasks.class));
+                    }
+                });
+        return true;
+    }
+}

+ 55 - 0
sys-api-service/src/main/java/com/xy/consumer/log/LogConsumerConfiguration.java

@@ -0,0 +1,55 @@
+package com.xy.consumer.log;
+
+import com.xy.configuration.MqttConfigUtils;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+@Configuration
+@IntegrationComponentScan
+public class LogConsumerConfiguration {
+
+    /**
+     * topic
+     */
+    public final static String TOPIC = "log";
+
+    /**
+     * 入站通道名(消费者)订阅的bean名称
+     */
+    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
+
+
+    /*******************************消费者*******************************************/
+
+    /**
+     * MQTT信息通道(消费者)
+     */
+    @Bean(name = CHANNEL_NAME_IN)
+    public MessageChannel mqttInboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息订阅绑定(消费者)
+     */
+    @Bean(name = TOPIC + "Inbound")
+    public MessageProducer inbound() {
+        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
+    }
+
+    /**
+     * MQTT消息处理器(消费者)
+     */
+    @Bean(name = TOPIC + "Handler")
+    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
+    public MessageHandler handler() {
+        return MqttConfigUtils.handler();
+    }
+
+}