浏览代码

mqtt改版

李进 1 年之前
父节点
当前提交
18434b9cca
共有 29 个文件被更改,包括 128 次插入714 次删除
  1. 10 0
      device-api-cloud/src/main/java/com/xy/feign/DeviceMqttConsumerFeign.java
  2. 5 6
      device-api-service/pom.xml
  3. 0 18
      device-api-service/src/main/java/com/xy/config/DeviceThreadPoolConfig.java
  4. 0 45
      device-api-service/src/main/java/com/xy/consumer/cmd/CmdMqttConfiguration.java
  5. 0 26
      device-api-service/src/main/java/com/xy/consumer/cmd/CmdProducer.java
  6. 0 62
      device-api-service/src/main/java/com/xy/consumer/connected/ConnectedConsumer.java
  7. 0 79
      device-api-service/src/main/java/com/xy/consumer/connected/ConnectedMqttConfiguration.java
  8. 0 26
      device-api-service/src/main/java/com/xy/consumer/connected/ConnectedProducer.java
  9. 0 82
      device-api-service/src/main/java/com/xy/consumer/data/DataConsumer.java
  10. 0 55
      device-api-service/src/main/java/com/xy/consumer/data/DataMqttConfiguration.java
  11. 0 79
      device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgMqttConfiguration.java
  12. 0 26
      device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgProducer.java
  13. 0 34
      device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedConsumer.java
  14. 0 79
      device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedMqttConfiguration.java
  15. 0 26
      device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedProducer.java
  16. 14 14
      device-api-service/src/main/java/com/xy/service/MqttServiceImpl.java
  17. 3 3
      device-api-service/src/main/java/com/xy/service/factory/cmd/CmdService.java
  18. 5 5
      device-api-service/src/main/java/com/xy/service/factory/cmd/impl/QualityCmdServiceImpl.java
  19. 4 4
      device-api-service/src/main/java/com/xy/service/factory/cmd/impl/SetsCmdServiceImpl.java
  20. 4 4
      device-api-service/src/main/java/com/xy/service/factory/cmd/impl/TaskCmdServiceImpl.java
  21. 4 11
      device-api-service/src/main/java/com/xy/service/factory/device/impl/alipay/AliPayOpenDeviceFatoryImpl.java
  22. 9 11
      device-api-service/src/main/java/com/xy/service/factory/device/impl/open/OpenDeviceFactoryImpl.java
  23. 25 0
      device-api/src/main/java/com/xy/dto/DeviceMqttDto.java
  24. 1 1
      device-api/src/main/java/com/xy/dto/MqttCmdDto.java
  25. 38 0
      device-api/src/main/java/com/xy/service/DeviceMqttConsumer.java
  26. 5 1
      device-start/pom.xml
  27. 0 8
      device-start/src/main/resources/bootstrap-prod.yml
  28. 0 8
      device-start/src/main/resources/bootstrap-uat.yml
  29. 1 1
      device-start/src/main/resources/bootstrap.yml

+ 10 - 0
device-api-cloud/src/main/java/com/xy/feign/DeviceMqttConsumerFeign.java

@@ -0,0 +1,10 @@
+package com.xy.feign;
+
+import com.xy.FeignInterceptor;
+import com.xy.consts.ServiceConsts;
+import com.xy.service.DeviceMqttConsumer;
+import org.springframework.cloud.openfeign.FeignClient;
+
+@FeignClient(value = ServiceConsts.SERVICE_NAME, configuration = FeignInterceptor.class)
+public interface DeviceMqttConsumerFeign extends DeviceMqttConsumer {
+}

+ 5 - 6
device-api-service/pom.xml

@@ -49,12 +49,16 @@
             <artifactId>merc-api</artifactId>
             <version>1.0</version>
         </dependency>
-
         <dependency>
             <groupId>com.xy</groupId>
             <artifactId>msg-api</artifactId>
             <version>1.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>mqtt-api</artifactId>
+            <version>1.0</version>
+        </dependency>
 
         <dependency>
             <groupId>com.xy</groupId>
@@ -72,11 +76,6 @@
             <artifactId>xy-server-web</artifactId>
             <version>1.0</version>
         </dependency>
-        <dependency>
-            <groupId>com.xy</groupId>
-            <artifactId>xy-mqtt</artifactId>
-            <version>1.0</version>
-        </dependency>
         <dependency>
             <groupId>com.xy</groupId>
             <artifactId>xy-job</artifactId>

+ 0 - 18
device-api-service/src/main/java/com/xy/config/DeviceThreadPoolConfig.java

@@ -11,8 +11,6 @@ public class DeviceThreadPoolConfig {
 
     public static final String DEVICE_COMMON_POLL = "deviceCommonPoll";
 
-    public static final String DEVICE_DATA_POLL = "deviceDataPoll";
-
     public static final String DEVICE_NETWORK_POLL = "deviceNetWorkPoll";
 
     public static final String ALIPAY_DEVICE_DETAIL = "alipayDeviceDetail";
@@ -35,22 +33,6 @@ public class DeviceThreadPoolConfig {
                 .builder();
     }
 
-    /**
-     * 统计mqtt消费线程池
-     */
-    @DynamicTp
-    @Bean(DEVICE_DATA_POLL)
-    public ThreadPoolTaskExecutor deviceDataPoll() {
-        int coreSize = 5;
-        return ThreadPoolUtils.newPoll()
-                .name(DEVICE_DATA_POLL)
-                .coreSize(coreSize)
-                .maxSize(coreSize * 10)
-                .keepAlive(30)
-                .queueSize(coreSize * 10)
-                .builder();
-    }
-
     /**
      * 上下线mqtt消费线程池
      */

+ 0 - 45
device-api-service/src/main/java/com/xy/consumer/cmd/CmdMqttConfiguration.java

@@ -1,45 +0,0 @@
-package com.xy.consumer.cmd;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class CmdMqttConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "cmdMqtt";
-
-    /**
-     * 出站通道名(生产者)发布的bean名称
-     */
-    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
-
-    /*******************************生产者*******************************************/
-
-    /**
-     * MQTT信息通道(生产者)
-     */
-    @Bean(name = CHANNEL_NAME_OUT)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息处理器(生产者)
-     */
-    @Bean(name = TOPIC + "MqttOutbound")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler mqttOutbound() {
-        return MqttConfigUtils.mqttOutbound(TOPIC);
-    }
-
-}

+ 0 - 26
device-api-service/src/main/java/com/xy/consumer/cmd/CmdProducer.java

@@ -1,26 +0,0 @@
-package com.xy.consumer.cmd;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-/**
- * 生产者接口
- */
-@Component
-@MessagingGateway(defaultRequestChannel = CmdMqttConfiguration.CHANNEL_NAME_OUT)
-public interface CmdProducer {
-
-    /**
-     * ata是发送消息的内容
-     * topic是消息发送的主题,就是配置文件的主题
-     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
-     */
-
-    void sendToMqtt(String data);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
-}

+ 0 - 62
device-api-service/src/main/java/com/xy/consumer/connected/ConnectedConsumer.java

@@ -1,62 +0,0 @@
-package com.xy.consumer.connected;
-
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import com.xy.annotate.MqttConsumerAsyn;
-import com.xy.config.DeviceThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
-import com.xy.device.EnumDeviceOnlineStatus;
-import com.xy.dto.DeviceNetRecordDto;
-import com.xy.dto.DeviceStatusDto;
-import com.xy.service.DeviceNetRecordServiceImpl;
-import com.xy.service.DeviceStatusServiceImpl;
-import com.xy.utils.SysDictUtils;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * <p>
- * 设备上线监听消费者
- * </p>
- *
- * @author lijin
- * @since 2022-12-27
- */
-@Slf4j
-@Service
-@AllArgsConstructor
-@MqttConsumerAsyn(value = DeviceThreadPoolConfig.DEVICE_NETWORK_POLL, isFailSaveEs = false)
-public class ConnectedConsumer implements MqttConsumer {
-
-    private DeviceStatusServiceImpl deviceStatusService;
-
-    private DeviceNetRecordServiceImpl deviceNetRecordService;
-
-    @Override
-    public boolean message(String topic, String payload) {
-        Integer value = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
-        return exc(payload, value);
-    }
-
-    public boolean exc(String payload, int netState) {
-        JSONObject jsonObject = JSONUtil.parseObj(payload);
-        String clientid = jsonObject.getStr("clientid");
-        Long deviceId;
-        try {
-            deviceId = Long.valueOf(clientid);
-        } catch (Exception e) {
-            return true;
-        }
-        //上报状态
-        DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
-        up.setNetState(netState);
-        deviceStatusService.up(up).getCode();
-        //添加联网记录
-        DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
-                .setDeviceId(deviceId);
-        save.setNetStatus(netState);
-        deviceNetRecordService.save(save);
-        return true;
-    }
-}

+ 0 - 79
device-api-service/src/main/java/com/xy/consumer/connected/ConnectedMqttConfiguration.java

@@ -1,79 +0,0 @@
-package com.xy.consumer.connected;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class ConnectedMqttConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "connectedNotify";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-    /**
-     * 出站通道名(生产者)发布的bean名称
-     */
-    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
-
-    /*******************************生产者*******************************************/
-
-    /**
-     * MQTT信息通道(生产者)
-     */
-    @Bean(name = CHANNEL_NAME_OUT)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息处理器(生产者)
-     */
-    @Bean(name = TOPIC + "MqttOutbound")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler mqttOutbound() {
-        return MqttConfigUtils.mqttOutbound(TOPIC);
-    }
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 0 - 26
device-api-service/src/main/java/com/xy/consumer/connected/ConnectedProducer.java

@@ -1,26 +0,0 @@
-package com.xy.consumer.connected;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-/**
- * 生产者接口
- */
-@Component
-@MessagingGateway(defaultRequestChannel = ConnectedMqttConfiguration.CHANNEL_NAME_OUT)
-public interface ConnectedProducer {
-
-    /**
-     * ata是发送消息的内容
-     * topic是消息发送的主题,就是配置文件的主题
-     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
-     */
-
-    void sendToMqtt(String data);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
-}

+ 0 - 82
device-api-service/src/main/java/com/xy/consumer/data/DataConsumer.java

@@ -1,82 +0,0 @@
-package com.xy.consumer.data;
-
-import cn.hutool.json.JSONObject;
-import cn.hutool.json.JSONUtil;
-import com.xy.annotate.MqttConsumerAsyn;
-import com.xy.config.DeviceThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
-import com.xy.dto.DataMqttVo;
-import com.xy.dto.DeviceDataDto;
-import com.xy.service.DeviceDataServiceImpl;
-import com.xy.service.DeviceStatusServiceImpl;
-import com.xy.utils.Emptys;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.util.List;
-
-/**
- * <p>
- * 数据统计消费者
- * </p>
- *
- * @author lijin
- * @since 2022-03-11
- */
-@Slf4j
-@Service
-@AllArgsConstructor
-@MqttConsumerAsyn(DeviceThreadPoolConfig.DEVICE_DATA_POLL)
-public class DataConsumer implements MqttConsumer {
-
-    private DeviceDataServiceImpl deviceDataService;
-
-    private DeviceStatusServiceImpl deviceStatusService;
-
-    @Override
-    public boolean message(String topic, String payload) {
-        JSONObject jsonObject = JSONUtil.parseObj(payload);
-        DataMqttVo dataMqttVo = jsonObject.toBean(DataMqttVo.class);
-        Boolean type = dataMqttVo.getType();
-        List<DataMqttVo.Goods> goods = dataMqttVo.getGoods();
-        Integer money = dataMqttVo.getMoney();
-        int number = 0;
-        if (Emptys.check(goods)) {
-            for (DataMqttVo.Goods good : goods) {
-                number = number + good.getNumber();
-            }
-        }
-        //设备统计
-        DeviceDataDto.SaveOrAccum saveOrAccum = new DeviceDataDto.SaveOrAccum()
-                .setDeviceId(dataMqttVo.getDeviceId());
-        saveOrAccum.setMercId(dataMqttVo.getMercId());
-        if (type) {
-            //购买
-            if (dataMqttVo.getIsError()) {
-                //异常单
-                saveOrAccum.setRiskCount(1);
-            } else {
-                if (money == null || money == 0) {
-                    //0元单
-                    saveOrAccum.setZeroCount(1);
-                } else {
-                    //正常单,累计商品数量
-                    saveOrAccum.setSalesMoney(money);
-                    saveOrAccum.setSalesCount(1);
-                    saveOrAccum.setGoodsCount(number);
-                }
-            }
-        } else {
-            //退款
-            saveOrAccum.setRefundMoney(money);
-            saveOrAccum.setRefundCount(1);
-        }
-        deviceDataService.saveOrAccum(saveOrAccum);
-        //修改库存
-        if (number > 0) {
-            deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
-        }
-        return true;
-    }
-}

+ 0 - 55
device-api-service/src/main/java/com/xy/consumer/data/DataMqttConfiguration.java

@@ -1,55 +0,0 @@
-package com.xy.consumer.data;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class DataMqttConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "deviceDataNotify";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 0 - 79
device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgMqttConfiguration.java

@@ -1,79 +0,0 @@
-package com.xy.consumer.device.push.msg;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class DevicePushMsgMqttConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "DevicePushMsgTopic";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-    /**
-     * 出站通道名(生产者)发布的bean名称
-     */
-    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
-
-    /*******************************生产者*******************************************/
-
-    /**
-     * MQTT信息通道(生产者)
-     */
-    @Bean(name = CHANNEL_NAME_OUT)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息处理器(生产者)
-     */
-    @Bean(name = TOPIC + "MqttOutbound")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler mqttOutbound() {
-        return MqttConfigUtils.mqttOutbound(TOPIC);
-    }
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 0 - 26
device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgProducer.java

@@ -1,26 +0,0 @@
-package com.xy.consumer.device.push.msg;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-/**
- * 生产者接口
- */
-@Component
-@MessagingGateway(defaultRequestChannel = DevicePushMsgMqttConfiguration.CHANNEL_NAME_OUT)
-public interface DevicePushMsgProducer {
-
-    /**
-     * ata是发送消息的内容
-     * topic是消息发送的主题,就是配置文件的主题
-     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
-     */
-
-    void sendToMqtt(String data);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
-}

+ 0 - 34
device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedConsumer.java

@@ -1,34 +0,0 @@
-package com.xy.consumer.disconnect;
-
-import com.xy.annotate.MqttConsumerAsyn;
-import com.xy.config.DeviceThreadPoolConfig;
-import com.xy.consumer.MqttConsumer;
-import com.xy.consumer.connected.ConnectedConsumer;
-import com.xy.device.EnumDeviceOnlineStatus;
-import com.xy.utils.SysDictUtils;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * <p>
- * 设备下线监听消费者
- * </p>
- *
- * @author lijin
- * @since 2022-12-27
- */
-@Slf4j
-@Service
-@AllArgsConstructor
-@MqttConsumerAsyn(value = DeviceThreadPoolConfig.DEVICE_NETWORK_POLL, isFailSaveEs = false)
-public class DisconnectedConsumer implements MqttConsumer {
-
-    private ConnectedConsumer connectedConsumer;
-
-    @Override
-    public boolean message(String topic, String payload) {
-        Integer value = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
-        return connectedConsumer.exc(payload, value);
-    }
-}

+ 0 - 79
device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedMqttConfiguration.java

@@ -1,79 +0,0 @@
-package com.xy.consumer.disconnect;
-
-import com.xy.configuration.MqttConfigUtils;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.integration.annotation.IntegrationComponentScan;
-import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.core.MessageProducer;
-import org.springframework.messaging.MessageChannel;
-import org.springframework.messaging.MessageHandler;
-
-@Configuration
-@IntegrationComponentScan
-public class DisconnectedMqttConfiguration {
-
-    /**
-     * topic
-     */
-    public final static String TOPIC = "disConnectedNotify";
-
-    /**
-     * 入站通道名(消费者)订阅的bean名称
-     */
-    public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
-
-    /**
-     * 出站通道名(生产者)发布的bean名称
-     */
-    public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
-
-    /*******************************生产者*******************************************/
-
-    /**
-     * MQTT信息通道(生产者)
-     */
-    @Bean(name = CHANNEL_NAME_OUT)
-    public MessageChannel mqttOutboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息处理器(生产者)
-     */
-    @Bean(name = TOPIC + "MqttOutbound")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
-    public MessageHandler mqttOutbound() {
-        return MqttConfigUtils.mqttOutbound(TOPIC);
-    }
-
-
-    /*******************************消费者*******************************************/
-
-    /**
-     * MQTT信息通道(消费者)
-     */
-    @Bean(name = CHANNEL_NAME_IN)
-    public MessageChannel mqttInboundChannel() {
-        return new DirectChannel();
-    }
-
-    /**
-     * MQTT消息订阅绑定(消费者)
-     */
-    @Bean(name = TOPIC + "Inbound")
-    public MessageProducer inbound() {
-        return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
-    }
-
-    /**
-     * MQTT消息处理器(消费者)
-     */
-    @Bean(name = TOPIC + "Handler")
-    @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
-    public MessageHandler handler() {
-        return MqttConfigUtils.handler();
-    }
-
-}

+ 0 - 26
device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedProducer.java

@@ -1,26 +0,0 @@
-package com.xy.consumer.disconnect;
-
-import org.springframework.integration.annotation.MessagingGateway;
-import org.springframework.integration.mqtt.support.MqttHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.stereotype.Component;
-
-/**
- * 生产者接口
- */
-@Component
-@MessagingGateway(defaultRequestChannel = DisconnectedMqttConfiguration.CHANNEL_NAME_OUT)
-public interface DisconnectedProducer {
-
-    /**
-     * ata是发送消息的内容
-     * topic是消息发送的主题,就是配置文件的主题
-     * qos是mqtt 对消息处理机制分为0,1,2 其中0表示的是订阅者没收到消息不会再次发送,消息会丢失,1表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息,2相比多了一次去重的动作,确保订阅者收到的消息有一次
-     */
-
-    void sendToMqtt(String data);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
-
-    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos);
-}

+ 14 - 14
device-api-service/src/main/java/com/xy/service/MqttServiceImpl.java

@@ -12,11 +12,7 @@ import com.xy.collections.list.JArrayList;
 import com.xy.collections.list.JList;
 import com.xy.collections.map.JHashMap;
 import com.xy.collections.map.JMap;
-import com.xy.consumer.cmd.CmdProducer;
-import com.xy.dto.AckMqttDto;
-import com.xy.dto.CommandMqtt;
-import com.xy.dto.MqttDto;
-import com.xy.dto.PaterDto;
+import com.xy.dto.*;
 import com.xy.entity.DeviceInfo;
 import com.xy.entity.MqttCommand;
 import com.xy.error.CommRuntimeException;
@@ -47,7 +43,7 @@ import static com.xy.utils.PlusBeans.*;
 @RequiredArgsConstructor
 public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand> implements MqttService {
 
-    private final CmdProducer cmdProducer;
+    private final DeviceMqttSendService deviceMqttSendService;
 
     private final RedisService<String> redisService;
 
@@ -71,7 +67,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
         return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
     }
 
-    public List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttDto> mqttDtos) {
+    public List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttCmdDto> mqttDtos) {
         String time = DataTime.getSring();
         List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
         mqttDtos.forEach(mqttDto -> {
@@ -98,7 +94,11 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
                 //指令记录
                 saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
                 //发送消息
-                cmdProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel());
+                deviceMqttSendService.cmd(new MqttDto.CmdParams()
+                        .setQos(mqttDto.getLevel())
+                        .setData(value.toString())
+                        .setTopic(topic)
+                );
                 list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
             } catch (Exception e) {
                 //指令记录
@@ -170,17 +170,17 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
 
     @ApiOperation("指令结果通知")
     @PostMapping("commandResultBack")
-    public R commandResultBack(@RequestBody @Validated MqttDto.ResultBack resultBack) {
+    public R commandResultBack(@RequestBody @Validated MqttCmdDto.ResultBack resultBack) {
         return FactoryUtils.getService(CmdService.class, resultBack.getCmdType()).resultBack(resultBack);
     }
 
     @PostMapping("snByCmdAndResult")
     @ApiOperation("sn查询指令和结果数据")
-    public R<MqttDto.Vo3> snByCmdAndResult(@RequestBody MqttDto.SnByCmdAndResult snByCmdAndResult) {
+    public R<MqttCmdDto.Vo3> snByCmdAndResult(@RequestBody MqttCmdDto.SnByCmdAndResult snByCmdAndResult) {
         MqttCommand mqttCommand = getById(snByCmdAndResult.getSn());
         String key = String.format(keyStr, snByCmdAndResult.getSn());
         Map<String, String> map = redisService.getMap(key);
-        MqttDto.Vo3 vo3 = copy(MqttDto.Vo3.class, mqttCommand);
+        MqttCmdDto.Vo3 vo3 = copy(MqttCmdDto.Vo3.class, mqttCommand);
         if (Emptys.check(map)) {
             String result = map.get("result");
             vo3.setResult(JSONUtil.parseObj(result));
@@ -190,7 +190,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
 
     @PostMapping("page")
     @ApiOperation("分页查询")
-    public R<PageBean<MqttDto.Vo2>> page(@RequestBody MqttDto.Page page) {
+    public R<PageBean<MqttCmdDto.Vo2>> page(@RequestBody MqttCmdDto.Page page) {
         LambdaQueryWrapper<MqttCommand> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class)
                 .like(MqttCommand::getError)
                 .like(MqttCommand::getValue)
@@ -204,7 +204,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
                 .orderByDesc(MqttCommand::getSendTime);
         PageBean pageBean = page.getPage();
         IPage<MqttCommand> iPage = page(toIPage(pageBean), lambdaQueryWrapper);
-        return R.ok(toPageBean(MqttDto.Vo2.class, iPage));
+        return R.ok(toPageBean(MqttCmdDto.Vo2.class, iPage));
     }
 
 
@@ -216,7 +216,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
      * @param status  状态
      * @param error   错误信息
      */
-    private void saveMqttCommand(MqttDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
+    private void saveMqttCommand(MqttCmdDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
         PaterDto pater = mqttDto.getPater();
         MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
                 .setSn(pater.getSn())

+ 3 - 3
device-api-service/src/main/java/com/xy/service/factory/cmd/CmdService.java

@@ -1,6 +1,6 @@
 package com.xy.service.factory.cmd;
 
-import com.xy.dto.MqttDto;
+import com.xy.dto.MqttCmdDto;
 import com.xy.utils.R;
 
 import java.util.List;
@@ -16,7 +16,7 @@ public interface CmdService {
      * @param mqttDtos
      * @return
      */
-    R senCommand(List<MqttDto> mqttDtos);
+    R senCommand(List<MqttCmdDto> mqttDtos);
 
     /**
      * 结果通知
@@ -24,5 +24,5 @@ public interface CmdService {
      * @param resultBack
      * @return
      */
-    R resultBack(MqttDto.ResultBack resultBack);
+    R resultBack(MqttCmdDto.ResultBack resultBack);
 }

+ 5 - 5
device-api-service/src/main/java/com/xy/service/factory/cmd/impl/QualityCmdServiceImpl.java

@@ -3,7 +3,7 @@ package com.xy.service.factory.cmd.impl;
 import cn.hutool.json.JSONObject;
 import com.xy.annotate.Factory;
 import com.xy.dto.DeviceQualityDto;
-import com.xy.dto.MqttDto;
+import com.xy.dto.MqttCmdDto;
 import com.xy.dto.PaterDto;
 import com.xy.entity.DeviceQuality;
 import com.xy.error.CommRuntimeException;
@@ -29,8 +29,8 @@ public class QualityCmdServiceImpl implements CmdService {
     private DeviceQualityServiceImpl deviceQualityService;
 
     @Override
-    public R senCommand(List<MqttDto> mqttDtos) {
-        Map<Integer, List<MqttDto>> map = new HashMap<>();
+    public R senCommand(List<MqttCmdDto> mqttDtos) {
+        Map<Integer, List<MqttCmdDto>> map = new HashMap<>();
         mqttDtos.forEach(mqttDto -> {
             PaterDto pater = mqttDto.getPater();
             JSONObject data = pater.getData();
@@ -57,10 +57,10 @@ public class QualityCmdServiceImpl implements CmdService {
     }
 
     @Override
-    public R resultBack(MqttDto.ResultBack resultBack) {
+    public R resultBack(MqttCmdDto.ResultBack resultBack) {
         DeviceQualityDto.QualityResultBack qualityResultBack = resultBack.getQualityResultBack();
         if (!Emptys.check(qualityResultBack)) {
-            return R.fail(LambdaUtils.getProperty(MqttDto.ResultBack::getQualityResultBack) + "不能为空");
+            return R.fail(LambdaUtils.getProperty(MqttCmdDto.ResultBack::getQualityResultBack) + "不能为空");
         }
         return deviceQualityService.qualityResultBack(qualityResultBack);
     }

+ 4 - 4
device-api-service/src/main/java/com/xy/service/factory/cmd/impl/SetsCmdServiceImpl.java

@@ -4,7 +4,7 @@ import cn.hutool.json.JSONObject;
 import com.xy.annotate.Factory;
 import com.xy.device.EnumMqttCmdTempletSets;
 import com.xy.dto.DeviceTempSetDto;
-import com.xy.dto.MqttDto;
+import com.xy.dto.MqttCmdDto;
 import com.xy.dto.PaterDto;
 import com.xy.entity.DeviceTempSet;
 import com.xy.service.DeviceTempSetServiceImpl;
@@ -24,8 +24,8 @@ public class SetsCmdServiceImpl implements CmdService {
     private DeviceTempSetServiceImpl deviceTempSetService;
 
     @Override
-    public R senCommand(List<MqttDto> mqttDtos) {
-        for (MqttDto mqttDto : mqttDtos) {
+    public R senCommand(List<MqttCmdDto> mqttDtos) {
+        for (MqttCmdDto mqttDto : mqttDtos) {
             PaterDto pater = mqttDto.getPater();
             JSONObject data = pater.getData();
             String type = data.getStr("type");
@@ -56,7 +56,7 @@ public class SetsCmdServiceImpl implements CmdService {
     }
 
     @Override
-    public R resultBack(MqttDto.ResultBack resultBack) {
+    public R resultBack(MqttCmdDto.ResultBack resultBack) {
         DeviceTempSetDto.DeviceTempResultBack deviceTempResultBack = resultBack.getDeviceTempResultBack();
         //温度设置指令
         if (resultBack.getCmdType().equals(EnumMqttCmdTempletSets.TEMPERATURE.getCode())) {

+ 4 - 4
device-api-service/src/main/java/com/xy/service/factory/cmd/impl/TaskCmdServiceImpl.java

@@ -6,7 +6,7 @@ import com.xy.device.EnumAppUpdateStatus;
 import com.xy.device.EnumMqttCmdTempletTask;
 import com.xy.dto.ActivityInfoDto;
 import com.xy.dto.DeviceVersionUpDto;
-import com.xy.dto.MqttDto;
+import com.xy.dto.MqttCmdDto;
 import com.xy.service.ActivityInfoService;
 import com.xy.service.DeviceVersionUpServiceImpl;
 import com.xy.service.factory.cmd.CmdService;
@@ -29,8 +29,8 @@ public class TaskCmdServiceImpl implements CmdService {
     private DeviceVersionUpServiceImpl deviceVersionUpService;
 
     @Override
-    public R senCommand(List<MqttDto> mqttDtos) {
-        for (MqttDto mqttDto : mqttDtos) {
+    public R senCommand(List<MqttCmdDto> mqttDtos) {
+        for (MqttCmdDto mqttDto : mqttDtos) {
             JSONObject data = mqttDto.getPater().getData();
             String type = data.getStr("type");
             //终止活动
@@ -56,7 +56,7 @@ public class TaskCmdServiceImpl implements CmdService {
     }
 
     @Override
-    public R resultBack(MqttDto.ResultBack resultBack) {
+    public R resultBack(MqttCmdDto.ResultBack resultBack) {
         DeviceVersionUpDto.DeviceVersionUpResultBack deviceVersionUpResultBack = resultBack.getDeviceVersionUpResultBack();
         //版本升级
         if (deviceVersionUpResultBack != null) {

+ 4 - 11
device-api-service/src/main/java/com/xy/service/factory/device/impl/alipay/AliPayOpenDeviceFatoryImpl.java

@@ -11,10 +11,6 @@ import com.xy.collections.list.JList;
 import com.xy.collections.map.JMap;
 import com.xy.config.DeviceThreadPoolConfig;
 import com.xy.constants.SpiResponseConst;
-import com.xy.consumer.connected.ConnectedMqttConfiguration;
-import com.xy.consumer.connected.ConnectedProducer;
-import com.xy.consumer.disconnect.DisconnectedMqttConfiguration;
-import com.xy.consumer.disconnect.DisconnectedProducer;
 import com.xy.device.EnumDeviceOnlineStatus;
 import com.xy.dto.*;
 import com.xy.dto.spi.DeviceAlarmNotifyDTO;
@@ -23,7 +19,6 @@ import com.xy.dto.spi.DeviceSetAttributesNotifyDTO;
 import com.xy.dto.spi.DeviceStatusChangeNotifyDTO;
 import com.xy.entity.DeviceInfo;
 import com.xy.entity.DeviceStatus;
-import com.xy.error.CommRuntimeException;
 import com.xy.service.*;
 import com.xy.service.factory.device.DeviceFactory;
 import com.xy.service.factory.device.impl.open.OpenDeviceFactoryImpl;
@@ -53,10 +48,6 @@ public class AliPayOpenDeviceFatoryImpl implements DeviceFactory, SpiDeviceServi
 
     private AlipayDeviceService alipayDeviceService;
 
-    private ConnectedProducer connectedProducer;
-
-    private DisconnectedProducer disconnectedProducer;
-
     private DeviceStatusServiceImpl deviceStatusService;
 
     private DeviceInfoServiceImpl deviceInfoService;
@@ -65,6 +56,8 @@ public class AliPayOpenDeviceFatoryImpl implements DeviceFactory, SpiDeviceServi
 
     private DeviceSysinfoServiceImpl deviceSysinfoService;
 
+    private DeviceMqttSendService deviceMqttSendService;
+
     @Override
     public R save(DeviceRegisterDto.Save save) {
         R r = openDeviceFactoryImpl.save(save);
@@ -155,9 +148,9 @@ public class AliPayOpenDeviceFatoryImpl implements DeviceFactory, SpiDeviceServi
         Integer value = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
         JSONObject jsonObject = new JSONObject().set("clientid", save.getDeviceId());
         if (save.getNetStatus().equals(value)) {
-            connectedProducer.sendToMqtt(jsonObject.toString(), ConnectedMqttConfiguration.TOPIC, 1);
+            deviceMqttSendService.connected(new MqttDto.RequestParams().setData(jsonObject.toString()));
         } else {
-            disconnectedProducer.sendToMqtt(jsonObject.toString(), DisconnectedMqttConfiguration.TOPIC, 1);
+            deviceMqttSendService.disconnected(new MqttDto.RequestParams().setData(jsonObject.toString()));
         }
         return R.ok();
     }

+ 9 - 11
device-api-service/src/main/java/com/xy/service/factory/device/impl/open/OpenDeviceFactoryImpl.java

@@ -10,8 +10,6 @@ import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.xy.annotate.Factory;
 import com.xy.collections.list.JArrayList;
 import com.xy.collections.map.JMap;
-import com.xy.consumer.device.push.msg.DevicePushMsgMqttConfiguration;
-import com.xy.consumer.device.push.msg.DevicePushMsgProducer;
 import com.xy.device.EnumDeviceActiveStatus;
 import com.xy.device.EnumDeviceBusyStatus;
 import com.xy.dto.*;
@@ -62,7 +60,7 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
 
     private DeviceEventMsgServiceImpl deviceEventMsgService;
 
-    private DevicePushMsgProducer devicePushMsgProducer;
+    private DeviceMqttSendService deviceMqttSendService;
 
     private DeviceChargingServiceImpl deviceChargingService;
 
@@ -201,13 +199,13 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
     @Override
     public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
         //发送指令
-        List<MqttDto> mqttDtos = new ArrayList<>();
+        List<MqttCmdDto> mqttDtos = new ArrayList<>();
         commandMqtts.forEach(commandMqtt -> {
             JSONObject templetObj = commandMqtt.getTemplet();
-            String debug = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDebug));
-            String level = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getLevel));
-            String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDelayTime));
-            String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getTimeout));
+            String debug = templetObj.getStr(LambdaUtils.getProperty(MqttCmdDto::getDebug));
+            String level = templetObj.getStr(LambdaUtils.getProperty(MqttCmdDto::getLevel));
+            String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttCmdDto::getDelayTime));
+            String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttCmdDto::getTimeout));
             String wkSn = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getWkSn));
             String actionType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getActionType));
             String cmdType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getCmdType));
@@ -219,7 +217,7 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
                     .setAck(Emptys.check(ack) && !ack.equals("$ack$") ? Boolean.valueOf(ack) : Emptys.check(ack) && ack.equals("$ack$") ? true : true)
                     .setWkSn(!Emptys.check(wkSn) ? mqttService.getWkSn() : "$wkSn$".equals(wkSn) ? mqttService.getWkSn() : Long.valueOf(wkSn))
                     .setData(data);
-            MqttDto mqttDto = new MqttDto()
+            MqttCmdDto mqttDto = new MqttCmdDto()
                     .setDeviceId(commandMqtt.getDeviceId())
                     .setDebug(Emptys.check(debug) && !debug.equals("$debug$") ? Boolean.valueOf(debug) : Emptys.check(debug) && debug.equals("$debug$") ? false : false)
                     .setLevel(Emptys.check(level) && !level.equals("$level$") ? Integer.valueOf(level) : Emptys.check(level) && level.equals("$level$") ? 1 : 1)
@@ -230,7 +228,7 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
         });
         List<Tuple.Tuple3<Long, Long, Boolean>> list = mqttService.sendMqtt(mqttDtos);
         //执行业务
-        Map<String, List<MqttDto>> map = new HashMap<>();
+        Map<String, List<MqttCmdDto>> map = new HashMap<>();
         mqttDtos.forEach(mqttDto -> {
             String cmdType = mqttDto.getPater().getCmdType();
             if (!map.containsKey(cmdType)) {
@@ -295,7 +293,7 @@ public class OpenDeviceFactoryImpl implements DeviceFactory {
         if (StrUtil.isEmpty(code)) {
             return;
         }
-        devicePushMsgProducer.sendToMqtt(JSONUtil.toJsonStr(deviceEventMsg), DevicePushMsgMqttConfiguration.TOPIC, 1);
+        deviceMqttSendService.devicePushMsg(new MqttDto.RequestParams().setData(JSONUtil.toJsonStr(deviceEventMsg)));
     }
 
     @Override

+ 25 - 0
device-api/src/main/java/com/xy/dto/DeviceMqttDto.java

@@ -0,0 +1,25 @@
+package com.xy.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.experimental.Accessors;
+
+import javax.validation.constraints.NotBlank;
+
+public class DeviceMqttDto {
+
+    @Data
+    @Accessors(chain = true)
+    public static class RequestParams {
+
+        @NotBlank(message = "topic不能为空")
+        @ApiModelProperty(value = "topic", required = true)
+        private String topic;
+
+        @NotBlank(message = "data不能为空")
+        @ApiModelProperty(value = "消息内容", required = true)
+        private String data;
+
+    }
+
+}

+ 1 - 1
device-api/src/main/java/com/xy/dto/MqttDto.java → device-api/src/main/java/com/xy/dto/MqttCmdDto.java

@@ -14,7 +14,7 @@ import java.time.LocalDateTime;
 
 @Data
 @Accessors(chain = true)
-public class MqttDto {
+public class MqttCmdDto {
 
     @ApiModelProperty(value = "设备id")
     private Long deviceId;

+ 38 - 0
device-api/src/main/java/com/xy/service/DeviceMqttConsumer.java

@@ -0,0 +1,38 @@
+package com.xy.service;
+
+import com.xy.annotate.RestMappingController;
+import com.xy.dto.DeviceMqttDto;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+/**
+ * 设备消费者 接口
+ */
+@RestMappingController("device-mqtt-consumer")
+public interface DeviceMqttConsumer {
+
+    /**
+     * 设备在线
+     *
+     * @param requestParams
+     */
+    @PostMapping("connected")
+    void connected(@RequestBody @Validated DeviceMqttDto.RequestParams requestParams);
+
+    /**
+     * 设备离线
+     *
+     * @param requestParams
+     */
+    @PostMapping("disconnect")
+    void disconnect(@RequestBody @Validated DeviceMqttDto.RequestParams requestParams);
+
+    /**
+     * 设备消息推送
+     *
+     * @param requestParams
+     */
+    @PostMapping("devicePushMsg")
+    void devicePushMsg(@RequestBody @Validated DeviceMqttDto.RequestParams requestParams);
+}

+ 5 - 1
device-start/pom.xml

@@ -50,12 +50,16 @@
             <artifactId>goods-api-cloud</artifactId>
             <version>1.0</version>
         </dependency>
-
         <dependency>
             <groupId>com.xy</groupId>
             <artifactId>data-api-cloud</artifactId>
             <version>1.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.xy</groupId>
+            <artifactId>mqtt-api-cloud</artifactId>
+            <version>1.0</version>
+        </dependency>
 
         <dependency>
             <groupId>com.xy</groupId>

+ 0 - 8
device-start/src/main/resources/bootstrap-prod.yml

@@ -1,11 +1,3 @@
-spring:
-  mqtt:
-    topics:
-      - { topic: "disConnectedNotify", handler: com.xy.consumer.disconnect.DisconnectedConsumer }
-      - { topic: "connectedNotify", handler: com.xy.consumer.connected.ConnectedConsumer }
-      - { topic: "deviceDataNotify", handler: com.xy.consumer.data.DataConsumer }
-      - { topic: "DevicePushMsgTopic", handler: com.xy.consumer.device.push.msg.DevicePushMsgConsumer }
-
 #微服务相关配置
 cloud:
   center:

+ 0 - 8
device-start/src/main/resources/bootstrap-uat.yml

@@ -1,11 +1,3 @@
-spring:
-  mqtt:
-    topics:
-      - { topic: "disConnectedNotify", handler: com.xy.consumer.disconnect.DisconnectedConsumer }
-      - { topic: "connectedNotify", handler: com.xy.consumer.connected.ConnectedConsumer }
-      - { topic: "deviceDataNotify", handler: com.xy.consumer.data.DataConsumer }
-      - { topic: "DevicePushMsgTopic", handler: com.xy.consumer.device.push.msg.DevicePushMsgConsumer }
-
 #微服务相关配置
 cloud:
   service:

+ 1 - 1
device-start/src/main/resources/bootstrap.yml

@@ -10,7 +10,7 @@ cloud:
   center:
     url: 119.96.213.127:9007
     config:
-      shared-configs: redis.yaml,mysql.yaml,mqtt.yaml,xxl-job.yaml,algorithm.yaml,xy-oss.yaml,alipay.yaml,cloudwalk.yaml,magic-api.yaml,knife4j.yaml
+      shared-configs: redis.yaml,mysql.yaml,xxl-job.yaml,algorithm.yaml,xy-oss.yaml,alipay.yaml,cloudwalk.yaml,magic-api.yaml,knife4j.yaml
       name: device
   service:
     name: dev-device