Browse Source

优化mqtt指令发送

李进 2 years ago
parent
commit
21d88ad274

+ 5 - 5
device-api-service/src/main/java/com/xy/config/ThreadPoolConfig.java

@@ -11,7 +11,7 @@ public class ThreadPoolConfig {
 
     public static final String COMMON_POLL = "commonPoll";
 
-    public static final String DEVICE_DATA_SAVE_OR_ACCUM = "deviceDataSaveOrAccum";
+    public static final String SEND_MQTT_POLL = "sendMqttPoll";
 
     /**
      * 公用线程池
@@ -28,13 +28,13 @@ public class ThreadPoolConfig {
     }
 
     /**
-     * 设备统计数据,添加/累加线程池
+     * 指令发布线程池
      */
     @DynamicTp
-    @Bean(DEVICE_DATA_SAVE_OR_ACCUM)
-    public ThreadPoolTaskExecutor deviceDataSaveOrAccum() {
+    @Bean(SEND_MQTT_POLL)
+    public ThreadPoolTaskExecutor sendMqttPoll() {
         return ThreadPoolUtils.newPoll()
-                .name(DEVICE_DATA_SAVE_OR_ACCUM)
+                .name(SEND_MQTT_POLL)
                 .coreSize(10)
                 .maxSize(200)
                 .keepAlive(60)

+ 3 - 7
device-api-service/src/main/java/com/xy/service/DeviceDataServiceImpl.java

@@ -5,11 +5,9 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.xy.annotation.Lock;
 import com.xy.collections.list.JArrayList;
-import com.xy.collections.list.JList;
 import com.xy.collections.map.JConcurrentHashMap;
 import com.xy.collections.map.JHashMap;
 import com.xy.collections.map.JMap;
-import com.xy.config.ThreadPoolConfig;
 import com.xy.dto.DeviceDataDto;
 import com.xy.entity.DeviceData;
 import com.xy.entity.SysDictRedis;
@@ -86,9 +84,8 @@ public class DeviceDataServiceImpl extends ServiceImpl<DeviceDataMapper, DeviceD
         JMap<String, DeviceData> typeMaps = new JArrayList<>(list).toMap(DeviceData::getType).cover();
         //新增或修改的多线程桶map集合
         JMap<String, DeviceData> dbMaps = new JConcurrentHashMap<>(map.size());
-        //多线程封装新增或修改对象
-        ThreadPoolUtils.Execute execute = ThreadPoolUtils.excPoll(ThreadPoolConfig.DEVICE_DATA_SAVE_OR_ACCUM, map.size());
-        map.forEach((type, sysDictRedis) -> execute.execute(() -> {
+        //封装新增或修改对象
+        map.forEach((type, sysDictRedis) -> {
             DeviceData deviceData = typeMaps.get(type);
             //添加
             if (deviceData == null) {
@@ -111,8 +108,7 @@ public class DeviceDataServiceImpl extends ServiceImpl<DeviceDataMapper, DeviceD
                     .setZeroCount(saveOrAccum.getZeroCount() != null ? saveOrAccum.getZeroCount() + deviceData.getZeroCount() : null)
                     .setDeviceFaultCount(saveOrAccum.getDeviceFaultCount() != null ? saveOrAccum.getDeviceFaultCount() + deviceData.getDeviceFaultCount() : null);
             dbMaps.put(type, updateDeviceData);
-        }));
-        execute.end();
+        });
         saveOrUpdateBatch(dbMaps.toList().value());
         return R.ok();
     }

+ 14 - 10
device-api-service/src/main/java/com/xy/service/MqttServiceImpl.java

@@ -5,13 +5,15 @@ import cn.easyes.core.conditions.LambdaEsQueryWrapper;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import com.github.yitter.idgen.YitIdHelper;
+import com.xy.collections.map.JConcurrentHashMap;
+import com.xy.collections.map.JMap;
+import com.xy.config.ThreadPoolConfig;
 import com.xy.dto.BackMqttDto;
 import com.xy.dto.MqttDto;
 import com.xy.dto.PaterDto;
 import com.xy.entity.MqttCommand;
 import com.xy.mapper.MqttCommandMapper;
 import com.xy.producer.MqttProducer;
-import com.xy.service.MqttService;
 import com.xy.utils.*;
 import com.xy.utils.enums.MqttCommandStatusEnum;
 import io.swagger.annotations.Api;
@@ -23,8 +25,8 @@ import org.springframework.util.StringUtils;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static com.xy.utils.EsBeans.toPageBean;
 
@@ -40,9 +42,10 @@ public class MqttServiceImpl implements MqttService {
 
     @Override
     @ApiOperation("指令发布")
-    public List<MqttDto.Vo> sendMqtt(List<MqttDto> mqttDtos) {
-        List<MqttDto.Vo> list = new ArrayList<>(mqttDtos.size());
-        mqttDtos.forEach(mqttDto -> {
+    public Map<String, Boolean> sendMqtt(List<MqttDto> mqttDtos) {
+        JMap<String, Boolean> map = new JConcurrentHashMap<>(mqttDtos.size());
+        ThreadPoolUtils.Execute execute = ThreadPoolUtils.excPoll(ThreadPoolConfig.SEND_MQTT_POLL, mqttDtos.size());
+        mqttDtos.forEach(mqttDto -> execute.execute(() -> {
             //给予默认值
             PaterDto pater = mqttDto.getPater()
                     .setSn(String.valueOf(YitIdHelper.nextId()))
@@ -68,14 +71,15 @@ public class MqttServiceImpl implements MqttService {
                 mqttProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel());
                 //指令记录
                 saveMqttCommand(mqttDto, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_SUCCESS, null);
-                list.add(new MqttDto.Vo().setWkSn(wkSn).setResult(true));
+                map.put(wkSn, true);
             } catch (Exception e) {
                 //指令记录
                 saveMqttCommand(mqttDto, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
-                list.add(new MqttDto.Vo().setWkSn(wkSn).setResult(false));
+                map.put(wkSn, false);
             }
-        });
-        return list;
+        }));
+        execute.end();
+        return map;
     }
 
     @Override
@@ -115,7 +119,7 @@ public class MqttServiceImpl implements MqttService {
         EsPageInfo<MqttCommand> mqttCommandEsPageInfo = mqttCommandMapper.pageQuery(lambdaEsQueryWrapper, (int) pageBean.getCurrent(), (int) pageBean.getSize());
         return R.ok(toPageBean(MqttDto.Vo2.class, mqttCommandEsPageInfo));
     }
-    
+
 
     /**
      * 新增指令记录

+ 0 - 11
device-api/src/main/java/com/xy/dto/MqttDto.java

@@ -35,17 +35,6 @@ public class MqttDto {
     @ApiModelProperty(value = "指令对象", required = true)
     private PaterDto pater;
 
-    @Data
-    @Accessors(chain = true)
-    public static class Vo {
-
-        @ApiModelProperty("业务系统sn编号")
-        private String wkSn;
-
-        @ApiModelProperty("结果值")
-        private Boolean result;
-    }
-
     @Data
     @Accessors(chain = true)
     public static class Page extends Vo2 {

+ 0 - 2
device-api/src/main/java/com/xy/dto/PaterDto.java

@@ -3,13 +3,11 @@ package com.xy.dto;
 import cn.hutool.json.JSONObject;
 import com.xy.utils.enums.MqttType1Enum;
 import com.xy.utils.enums.MqttType2Enum;
-import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.experimental.Accessors;
 
 @Data
-@ApiModel("指令通用父类")
 @Accessors(chain = true)
 public class PaterDto {
 

+ 2 - 1
device-api/src/main/java/com/xy/service/MqttService.java

@@ -9,6 +9,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * mqtt接口
@@ -23,7 +24,7 @@ public interface MqttService {
      * @return k=wkSn v=布尔结果
      */
     @PostMapping("sendMqtt")
-    List<MqttDto.Vo> sendMqtt(@RequestBody List<MqttDto> mqttDtos);
+    Map<String, Boolean> sendMqtt(@RequestBody List<MqttDto> mqttDtos);
 
     /**
      * 指令结果回执