Browse Source

优化mqtt指令发送

李进 2 years ago
parent
commit
8b0ceb428a

+ 2 - 2
device-api-service/src/main/java/com/xy/entity/MqttCommand.java

@@ -53,9 +53,9 @@ public class MqttCommand {
     private String execTime;
 
     /**
-     * 指令过期时间
+     * 指令有效期
      */
-    private String timeout;
+    private Integer timeout;
 
     /**
      * 接收指令的topic

+ 1 - 1
device-api-service/src/main/java/com/xy/service/DeviceDataServiceImpl.java

@@ -57,7 +57,7 @@ public class DeviceDataServiceImpl extends ServiceImpl<DeviceDataMapper, DeviceD
 
     @PostMapping("page")
     @ApiOperation("分页查询")
-    private R<PageBean<DeviceDataDto.Vo>> page(@RequestBody DeviceDataDto.Page page) {
+    public R<PageBean<DeviceDataDto.Vo>> page(@RequestBody DeviceDataDto.Page page) {
         PageBean pageBean = page.getPage();
         LambdaQueryWrapper<DeviceData> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, DeviceData.class)
                 .ge(DeviceData::getCreateTime, page.getBeginCreateTime())

+ 3 - 2
device-api-service/src/main/java/com/xy/service/MqttAclServiceImpl.java

@@ -10,6 +10,7 @@ import com.xy.mapper.MqttAclMapper;
 import com.xy.utils.MybatisPlusQuery;
 import com.xy.utils.PageBean;
 import com.xy.utils.R;
+import com.xy.utils.consts.CommConsts;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.AllArgsConstructor;
@@ -33,7 +34,7 @@ import static com.xy.utils.PlusBeans.toPageBean;
  */
 @Service
 @AllArgsConstructor
-@Api(tags = "")
+@Api(tags = "mqtt授权")
 public class MqttAclServiceImpl extends ServiceImpl<MqttAclMapper, MqttAcl> implements MqttAclService {
 
     private XyMqttConfig xyMqttConfig;
@@ -52,7 +53,7 @@ public class MqttAclServiceImpl extends ServiceImpl<MqttAclMapper, MqttAcl> impl
     public R save(@RequestBody @Validated MqttAclDto.Save save) {
         MqttAcl saveInfo = copy(MqttAcl.class, save)
                 .setIpaddress(xyMqttConfig.getUrl())
-                .setTopic(save.getClientid() + "-CMD");
+                .setTopic(save.getClientid() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX);
         save(saveInfo);
         return R.ok();
     }

+ 11 - 10
device-api-service/src/main/java/com/xy/service/MqttServiceImpl.java

@@ -5,6 +5,7 @@ import cn.easyes.core.conditions.LambdaEsQueryWrapper;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import com.github.yitter.idgen.YitIdHelper;
+import com.xy.annotate.Timer;
 import com.xy.collections.map.JConcurrentHashMap;
 import com.xy.collections.map.JMap;
 import com.xy.config.ThreadPoolConfig;
@@ -15,6 +16,7 @@ import com.xy.entity.MqttCommand;
 import com.xy.mapper.MqttCommandMapper;
 import com.xy.producer.MqttProducer;
 import com.xy.utils.*;
+import com.xy.utils.consts.CommConsts;
 import com.xy.utils.enums.MqttCommandStatusEnum;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -40,31 +42,32 @@ public class MqttServiceImpl implements MqttService {
 
     private MqttCommandMapper mqttCommandMapper;
 
+    @Timer
     @Override
     @ApiOperation("指令发布")
     public Map<String, Boolean> sendMqtt(List<MqttDto> mqttDtos) {
+        String time = DataTime.getSring();
         JMap<String, Boolean> map = new JConcurrentHashMap<>(mqttDtos.size());
         ThreadPoolUtils.Execute execute = ThreadPoolUtils.excPoll(ThreadPoolConfig.SEND_MQTT_POLL, mqttDtos.size());
         mqttDtos.forEach(mqttDto -> execute.execute(() -> {
             //给予默认值
             PaterDto pater = mqttDto.getPater()
                     .setSn(String.valueOf(YitIdHelper.nextId()))
-                    .setTime(DataTime.getSring());
+                    .setTime(time)
+                    .setDeviceId(mqttDto.getDeviceId());
             //延迟发布处理
             String delayTime = mqttDto.getDelayTime();
-            String topic = mqttDto.getTopic();
+            String topic = mqttDto.getDeviceId().toString(); //+ CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
             if (!StringUtils.isEmpty(delayTime)) {
                 //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
                 long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
-                topic = String.format("$delayed/%d/%s", m, mqttDto.getTopic());
+                topic = String.format("$delayed/%d/%s", m, topic);
                 mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
             }
-            //处理消息过期时间
-            pater.setTimeout(DataTime.getStringAround(0, 0, 0, 0, 0, mqttDto.getTimeout()));
+            //消息过期时间
+            pater.setTimeout(mqttDto.getTimeout());
             //翻译枚举
-            JSONObject value = JSONUtil.parseObj(pater)
-                    .set(LambdaUtils.getProperty(PaterDto::getType1), pater.getType1().getKey())
-                    .set(LambdaUtils.getProperty(PaterDto::getType2), pater.getType2().getKey());
+            JSONObject value = JSONUtil.parseObj(pater);
             String wkSn = pater.getWkSn();
             try {
                 //发送消息
@@ -133,8 +136,6 @@ public class MqttServiceImpl implements MqttService {
         PaterDto pater = mqttDto.getPater();
         MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
                 .setId(pater.getSn())
-                .setType1(pater.getType1().getKey())
-                .setType2(pater.getType2().getKey())
                 .setStatus(Integer.parseInt(status.getKey()))
                 .setValue(value.toString())
                 .setSendTime(pater.getTime())

+ 10 - 0
device-api-service/src/main/java/com/xy/utils/consts/CommConsts.java

@@ -0,0 +1,10 @@
+package com.xy.utils.consts;
+
+public class CommConsts {
+
+    /**
+     * 设备mqtt的topic尾缀
+     */
+    public static final String DEVICE_MQTT_TOPIC_SUFFIX  = "-CMD";
+
+}

+ 3 - 3
device-api/src/main/java/com/xy/dto/MqttDto.java

@@ -16,9 +16,9 @@ import java.time.LocalDateTime;
 @Accessors(chain = true)
 public class MqttDto {
 
-    @NotBlank(message = "topic不能为空")
-    @ApiModelProperty(value = "设备topic", required = true)
-    private String topic;
+    @NotBlank(message = "deviceId不能为空")
+    @ApiModelProperty(value = "设备id", required = true)
+    private Long deviceId;
 
     @ApiModelProperty("是否调试模式 默认=false")
     private Boolean debug = false;

+ 7 - 6
device-api/src/main/java/com/xy/dto/PaterDto.java

@@ -1,8 +1,6 @@
 package com.xy.dto;
 
 import cn.hutool.json.JSONObject;
-import com.xy.utils.enums.MqttType1Enum;
-import com.xy.utils.enums.MqttType2Enum;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.experimental.Accessors;
@@ -17,11 +15,14 @@ public class PaterDto {
     @ApiModelProperty(value = "业务系统sn编号", required = true)
     private String wkSn;
 
+    @ApiModelProperty(value = "设备id", hidden = true)
+    private Long deviceId;
+
     @ApiModelProperty(value = "大类", required = true)
-    private MqttType1Enum type1;
+    private String type1;
 
     @ApiModelProperty(value = "小类", required = true)
-    private MqttType2Enum type2;
+    private String type2;
 
     @ApiModelProperty("是否应答 默认=true")
     private Boolean ack = true;
@@ -29,8 +30,8 @@ public class PaterDto {
     @ApiModelProperty(value = "指令发送时间", hidden = true)
     private String time;
 
-    @ApiModelProperty(value = "指令过期时间", hidden = true)
-    private String timeout;
+    @ApiModelProperty(value = "指令有效期,单位:s", hidden = true)
+    private Integer timeout;
 
     @ApiModelProperty("业务方json数据")
     private JSONObject data;