Pārlūkot izejas kodu

支付宝设备重启、运营状态修改、上下线

李进 2 gadi atpakaļ
vecāks
revīzija
0c5b780641

+ 61 - 0
device-api-service/src/main/java/com/xy/alipay/AliPayUtils.java

@@ -0,0 +1,61 @@
+package com.xy.alipay;
+
+import cn.hutool.json.JSONObject;
+import com.xy.consumer.connected.ConnectedMqttConfiguration;
+import com.xy.consumer.connected.ConnectedProducer;
+import com.xy.consumer.disconnect.DisconnectedMqttConfiguration;
+import com.xy.consumer.disconnect.DisconnectedProducer;
+import com.xy.utils.SpringBeanUtils;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * 支付宝设备工具类
+ */
+public class AliPayUtils {
+
+    /**
+     * 设备上线
+     *
+     * @param deviceId
+     */
+    public static void connectedNotify(Long deviceId) {
+        connectedNotify(Arrays.asList(deviceId));
+    }
+
+    /**
+     * 设备上线
+     *
+     * @param deviceIds
+     */
+    public static void connectedNotify(List<Long> deviceIds) {
+        ConnectedProducer connectedProducer = SpringBeanUtils.getBean(ConnectedProducer.class);
+        deviceIds.forEach(deviceId -> {
+            JSONObject jsonObject = new JSONObject().set("clientid", deviceId);
+            connectedProducer.sendToMqtt(jsonObject.toString(), ConnectedMqttConfiguration.TOPIC, 1);
+        });
+    }
+
+    /**
+     * 设备下线
+     *
+     * @param deviceId
+     */
+    public static void disConnectedNotify(Long deviceId) {
+        disConnectedNotify(Arrays.asList(deviceId));
+    }
+
+    /**
+     * 设备下线
+     *
+     * @param deviceIds
+     */
+    public static void disConnectedNotify(List<Long> deviceIds) {
+        DisconnectedProducer disconnectedProducer = SpringBeanUtils.getBean(DisconnectedProducer.class);
+        deviceIds.forEach(deviceId -> {
+            JSONObject jsonObject = new JSONObject().set("clientid", deviceId);
+            disconnectedProducer.sendToMqtt(jsonObject.toString(), DisconnectedMqttConfiguration.TOPIC, 1);
+        });
+    }
+}

+ 24 - 0
device-api-service/src/main/java/com/xy/consumer/connected/ConnectedMqttConfiguration.java

@@ -24,6 +24,30 @@ public class ConnectedMqttConfiguration {
      */
     public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
 
+    /**
+     * 出站通道名(生产者)发布的bean名称
+     */
+    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
+
+    /*******************************生产者*******************************************/
+
+    /**
+     * MQTT信息通道(生产者)
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息处理器(生产者)
+     */
+    @Bean(name = TOPIC + "MqttOutbound")
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler mqttOutbound() {
+        return MqttConfigUtils.mqttOutbound(TOPIC);
+    }
+
 
     /*******************************消费者*******************************************/
 

+ 26 - 0
device-api-service/src/main/java/com/xy/consumer/connected/ConnectedProducer.java

@@ -0,0 +1,26 @@
+package com.xy.consumer.connected;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * 生产者接口
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = ConnectedMqttConfiguration.CHANNEL_NAME_OUT)
+public interface ConnectedProducer {
+
+    /**
+     * ata是发送消息的内容
+     * topic是消息发送的主题,就是配置文件的主题
+     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
+     */
+
+    void sendToMqtt(String data);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
+}

+ 24 - 0
device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedMqttConfiguration.java

@@ -24,6 +24,30 @@ public class DisconnectedMqttConfiguration {
      */
     public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
 
+    /**
+     * 出站通道名(生产者)发布的bean名称
+     */
+    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
+
+    /*******************************生产者*******************************************/
+
+    /**
+     * MQTT信息通道(生产者)
+     */
+    @Bean(name = CHANNEL_NAME_OUT)
+    public MessageChannel mqttOutboundChannel() {
+        return new DirectChannel();
+    }
+
+    /**
+     * MQTT消息处理器(生产者)
+     */
+    @Bean(name = TOPIC + "MqttOutbound")
+    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
+    public MessageHandler mqttOutbound() {
+        return MqttConfigUtils.mqttOutbound(TOPIC);
+    }
+
 
     /*******************************消费者*******************************************/
 

+ 26 - 0
device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedProducer.java

@@ -0,0 +1,26 @@
+package com.xy.consumer.disconnect;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+/**
+ * 生产者接口
+ */
+@Component
+@MessagingGateway(defaultRequestChannel = DisconnectedMqttConfiguration.CHANNEL_NAME_OUT)
+public interface DisconnectedProducer {
+
+    /**
+     * ata是发送消息的内容
+     * topic是消息发送的主题,就是配置文件的主题
+     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
+     */
+
+    void sendToMqtt(String data);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
+
+    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
+}