Explorar o código

设备故障消息推送

谭斌 %!s(int64=2) %!d(string=hai) anos
pai
achega
f1ddae2f8e

+ 6 - 0
device-api-service/pom.xml

@@ -50,6 +50,12 @@
             <version>1.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>msg-api</artifactId>
+            <version>1.0</version>
+        </dependency>
+
         <dependency>
             <groupId>com.xy</groupId>
             <artifactId>xy-redis</artifactId>

+ 70 - 0
device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgConsumer.java

@@ -0,0 +1,70 @@
+package com.xy.consumer.device.push.msg;
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.json.JSONUtil;
+import com.xy.consumer.MqttConsumer;
+import com.xy.dto.MsgConfigTestDto;
+import com.xy.entity.DeviceEventMsg;
+import com.xy.enums.ChannelType;
+import com.xy.service.MsgSendApiService;
+import com.xy.utils.R;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * 设备消息推送消费者
+ *
+ * @author 谭斌
+ * @date 2023/06/06
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class DevicePushMsgConsumer implements MqttConsumer {
+
+    private MsgSendApiService msgSendApiService;
+
+    @Override
+    public boolean message(String topic, String payload) {
+        DeviceEventMsg deviceEventMsg = JSONUtil.toBean(payload, DeviceEventMsg.class);
+        if (deviceEventMsg != null) {
+            Long deviceId = deviceEventMsg.getDeviceId();
+            Long mercId = deviceEventMsg.getMercId();
+            LocalDateTime createTime = deviceEventMsg.getCreateTime();
+            String msg = deviceEventMsg.getMsg();
+            Long configId = 12L;
+            List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
+            if (CollUtil.isNotEmpty(bizParams)) {
+                List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
+                bizDataList.forEach(b -> {
+                    String channelType = b.getChannelType();
+                    if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
+                        //微信公众号
+                        Map<String, Object> params = b.getTemplateParams();
+                        params.put("device", deviceId);
+                        params.put("type", msg);
+                        params.put("time", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
+                        Set<String> receivers = new HashSet<>();
+                        //TODO: 无正式公众号 , 接收人 暂时写死测试
+                        receivers.add("oT7m45tN1msid-HJ0i8I5WDCpDXE");
+                        b.setReceivers(receivers);
+                        b.setTemplateParams(params);
+                    }
+                });
+                msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(bizDataList));
+            }
+        }
+        return true;
+    }
+}

+ 79 - 0
device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgMqttConfiguration.java

@@ -0,0 +1,79 @@
+package com.xy.consumer.device.push.msg;
+
+import com.xy.configuration.MqttConfigUtils;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.IntegrationComponentScan;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+
+@Configuration
+@IntegrationComponentScan
+public class DevicePushMsgMqttConfiguration {
+
+    /**
+     * topic
+     */
+    public final static String TOPIC = "DevicePushMsgTopic";
+
+    /**
+     * 入站通道名(消费者)订阅的bean名称
+     */
+    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
+
+    /**
+     * 出站通道名(生产者)发布的bean名称
+     */
+    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
+
+    /*******************************生产者*******************************************/
+
+    /**
+     * MQTT信息通道(生产者)
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息处理器(生产者)
+     */
+    @Bean(name = TOPIC + "MqttOutbound")
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler mqttOutbound() {
+        return MqttConfigUtils.mqttOutbound(TOPIC);
+    }
+
+
+    /*******************************消费者*******************************************/
+
+    /**
+     * MQTT信息通道(消费者)
+     */
+    @Bean(name = CHANNEL_NAME_IN)
+    public MessageChannel mqttInboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息订阅绑定(消费者)
+     */
+    @Bean(name = TOPIC + "Inbound")
+    public MessageProducer inbound() {
+        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
+    }
+
+    /**
+     * MQTT消息处理器(消费者)
+     */
+    @Bean(name = TOPIC + "Handler")
+    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
+    public MessageHandler handler() {
+        return MqttConfigUtils.handler();
+    }
+
+}

+ 26 - 0
device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgProducer.java

@@ -0,0 +1,26 @@
+package com.xy.consumer.device.push.msg;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * 生产者接口
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = DevicePushMsgMqttConfiguration.CHANNEL_NAME_OUT)
+public interface DevicePushMsgProducer {
+
+    /**
+     * ata是发送消息的内容
+     * topic是消息发送的主题,就是配置文件的主题
+     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
+     */
+
+    void sendToMqtt(String data);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
+}

+ 8 - 0
device-api-service/src/main/java/com/xy/service/factory/device/impl/open/OpenDeviceFactoryImpl.java

@@ -1,9 +1,12 @@
 package com.xy.service.factory.device.impl.open;
 
 import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.xy.annotate.Factory;
+import com.xy.consumer.device.push.msg.DevicePushMsgMqttConfiguration;
+import com.xy.consumer.device.push.msg.DevicePushMsgProducer;
 import com.xy.device.EnumDeviceActiveStatus;
 import com.xy.device.EnumDeviceBusyStatus;
 import com.xy.device.EnumDeviceCharging;
@@ -55,6 +58,9 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
 
     private DeviceEventMsgServiceImpl deviceEventMsgService;
 
+    private DevicePushMsgProducer devicePushMsgProducer;
+
+
     @Override
     public R save(DeviceRegisterDto.Save save) {
         DeviceRegister deviceRegister = copy(DeviceRegister.class, save.setDeviceSn(save.getDeviceSn().toUpperCase()));
@@ -221,6 +227,8 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
                             .setDeviceId(deviceInfo.getDeviceId())
                             .setFaultLevel(Integer.valueOf(sysCodeConfigureRedis.getExpand()));
                     deviceInfoService.updateById(updateDeviceInfo);
+                    //设备故障消息推送
+                    devicePushMsgProducer.sendToMqtt(JSONUtil.toJsonStr(deviceEventMsg), DevicePushMsgMqttConfiguration.TOPIC, 1);
                 }
             }
         }

+ 5 - 4
device-start/src/main/resources/bootstrap-uat.yml

@@ -1,9 +1,10 @@
 spring:
   mqtt:
     topics:
-      - {topic: "disConnectedNotify", handler: com.xy.consumer.disconnect.DisconnectedConsumer}
-      - {topic: "connectedNotify", handler: com.xy.consumer.connected.ConnectedConsumer}
-      - {topic: "deviceDataNotify", handler: com.xy.consumer.data.DataConsumer}
+      - { topic: "disConnectedNotify", handler: com.xy.consumer.disconnect.DisconnectedConsumer }
+      - { topic: "connectedNotify", handler: com.xy.consumer.connected.ConnectedConsumer }
+      - { topic: "deviceDataNotify", handler: com.xy.consumer.data.DataConsumer }
+      - { topic: "DevicePushMsgTopic", handler: com.xy.consumer.device.push.msg.DevicePushMsgConsumer }
 
 #微服务相关配置
 cloud:
@@ -20,4 +21,4 @@ xxl:
       port: 9061
 
 #雪花ID唯一标识
-workerId: 10
+workerId: 10