소스 검색

mqtt消费者改版

李进 1 년 전
부모
커밋
ee77c4f6e9

+ 11 - 13
device-api-service/src/main/java/com/xy/consumer/connected/ConnectedConsumer.java

@@ -2,6 +2,7 @@ package com.xy.consumer.connected;
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
+import com.xy.annotate.MqttConsumerConfigure;
 import com.xy.config.DeviceThreadPoolConfig;
 import com.xy.consumer.MqttConsumer;
 import com.xy.device.EnumDeviceOnlineStatus;
@@ -10,7 +11,6 @@ import com.xy.dto.DeviceStatusDto;
 import com.xy.service.DeviceNetRecordServiceImpl;
 import com.xy.service.DeviceStatusServiceImpl;
 import com.xy.utils.SysDictUtils;
-import com.xy.utils.ThreadPoolUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -26,6 +26,7 @@ import org.springframework.stereotype.Service;
 @Slf4j
 @Service
 @AllArgsConstructor
+@MqttConsumerConfigure(threadPoll = DeviceThreadPoolConfig.DEVICE_NETWORK_POLL, isFailSaveEs = false)
 public class ConnectedConsumer implements MqttConsumer {
 
     private DeviceStatusServiceImpl deviceStatusService;
@@ -47,18 +48,15 @@ public class ConnectedConsumer implements MqttConsumer {
         } catch (Exception e) {
             return true;
         }
-        ThreadPoolUtils.excPoll(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL, 1)
-                .execute(() -> {
-                    //上报状态
-                    DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
-                    up.setNetState(netState);
-                    deviceStatusService.up(up).getCode();
-                    //添加联网记录
-                    DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
-                            .setDeviceId(deviceId);
-                    save.setNetStatus(netState);
-                    deviceNetRecordService.save(save);
-                });
+        //上报状态
+        DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
+        up.setNetState(netState);
+        deviceStatusService.up(up).getCode();
+        //添加联网记录
+        DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
+                .setDeviceId(deviceId);
+        save.setNetStatus(netState);
+        deviceNetRecordService.save(save);
         return true;
     }
 }

+ 43 - 46
device-api-service/src/main/java/com/xy/consumer/data/DataConsumer.java

@@ -2,6 +2,7 @@ package com.xy.consumer.data;
 
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
+import com.xy.annotate.MqttConsumerConfigure;
 import com.xy.config.DeviceThreadPoolConfig;
 import com.xy.consumer.MqttConsumer;
 import com.xy.dto.DataMqttVo;
@@ -9,7 +10,6 @@ import com.xy.dto.DeviceDataDto;
 import com.xy.service.DeviceDataServiceImpl;
 import com.xy.service.DeviceStatusServiceImpl;
 import com.xy.utils.Emptys;
-import com.xy.utils.ThreadPoolUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -27,6 +27,7 @@ import java.util.List;
 @Slf4j
 @Service
 @AllArgsConstructor
+@MqttConsumerConfigure(threadPoll = DeviceThreadPoolConfig.DEVICE_DATA_POLL)
 public class DataConsumer implements MqttConsumer {
 
     private DeviceDataServiceImpl deviceDataService;
@@ -35,51 +36,47 @@ public class DataConsumer implements MqttConsumer {
 
     @Override
     public boolean message(String topic, String payload) {
-        ThreadPoolUtils.excPoll(DeviceThreadPoolConfig.DEVICE_DATA_POLL, 1)
-                .execute(() -> {
-                    JSONObject jsonObject = JSONUtil.parseObj(payload);
-                    DataMqttVo dataMqttVo = jsonObject.toBean(DataMqttVo.class);
-                    Boolean type = dataMqttVo.getType();
-                    List<DataMqttVo.Goods> goods = dataMqttVo.getGoods();
-                    Integer money = dataMqttVo.getMoney();
-                    int number = 0;
-                    if (Emptys.check(goods)) {
-                        for (DataMqttVo.Goods good : goods) {
-                            number = number + good.getNumber();
-                        }
-                    }
-                    //设备统计
-                    DeviceDataDto.SaveOrAccum saveOrAccum = new DeviceDataDto.SaveOrAccum()
-                            .setDeviceId(dataMqttVo.getDeviceId());
-                    saveOrAccum.setMercId(dataMqttVo.getMercId());
-                    if (type) {
-                        //购买
-                        if (dataMqttVo.getIsError()) {
-                            //异常单
-                            saveOrAccum.setRiskCount(1);
-                        } else {
-                            if (money == null || money == 0) {
-                                //0元单
-                                saveOrAccum.setZeroCount(1);
-                            } else {
-                                //正常单,累计商品数量
-                                saveOrAccum.setSalesMoney(money);
-                                saveOrAccum.setSalesCount(1);
-                                saveOrAccum.setGoodsCount(number);
-                            }
-                        }
-                    } else {
-                        //退款
-                        saveOrAccum.setRefundMoney(money);
-                        saveOrAccum.setRefundCount(1);
-                    }
-                    deviceDataService.saveOrAccum(saveOrAccum);
-                    //修改库存
-                    if (number > 0) {
-                        deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
-                    }
-                })
-                .error(e -> log.error("统计消费者异常,{}", e));
+        JSONObject jsonObject = JSONUtil.parseObj(payload);
+        DataMqttVo dataMqttVo = jsonObject.toBean(DataMqttVo.class);
+        Boolean type = dataMqttVo.getType();
+        List<DataMqttVo.Goods> goods = dataMqttVo.getGoods();
+        Integer money = dataMqttVo.getMoney();
+        int number = 0;
+        if (Emptys.check(goods)) {
+            for (DataMqttVo.Goods good : goods) {
+                number = number + good.getNumber();
+            }
+        }
+        //设备统计
+        DeviceDataDto.SaveOrAccum saveOrAccum = new DeviceDataDto.SaveOrAccum()
+                .setDeviceId(dataMqttVo.getDeviceId());
+        saveOrAccum.setMercId(dataMqttVo.getMercId());
+        if (type) {
+            //购买
+            if (dataMqttVo.getIsError()) {
+                //异常单
+                saveOrAccum.setRiskCount(1);
+            } else {
+                if (money == null || money == 0) {
+                    //0元单
+                    saveOrAccum.setZeroCount(1);
+                } else {
+                    //正常单,累计商品数量
+                    saveOrAccum.setSalesMoney(money);
+                    saveOrAccum.setSalesCount(1);
+                    saveOrAccum.setGoodsCount(number);
+                }
+            }
+        } else {
+            //退款
+            saveOrAccum.setRefundMoney(money);
+            saveOrAccum.setRefundCount(1);
+        }
+        deviceDataService.saveOrAccum(saveOrAccum);
+        //修改库存
+        if (number > 0) {
+            deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
+        }
         return true;
     }
 }

+ 3 - 1
device-api-service/src/main/java/com/xy/consumer/disconnect/DisconnectedConsumer.java

@@ -1,5 +1,7 @@
 package com.xy.consumer.disconnect;
 
+import com.xy.annotate.MqttConsumerConfigure;
+import com.xy.config.DeviceThreadPoolConfig;
 import com.xy.consumer.MqttConsumer;
 import com.xy.consumer.connected.ConnectedConsumer;
 import com.xy.device.EnumDeviceOnlineStatus;
@@ -19,13 +21,13 @@ import org.springframework.stereotype.Service;
 @Slf4j
 @Service
 @AllArgsConstructor
+@MqttConsumerConfigure(threadPoll = DeviceThreadPoolConfig.DEVICE_NETWORK_POLL, isFailSaveEs = false)
 public class DisconnectedConsumer implements MqttConsumer {
 
     private ConnectedConsumer connectedConsumer;
 
     @Override
     public boolean message(String topic, String payload) {
-        log.info("设备离线通知:{}", payload);
         Integer value = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
         return connectedConsumer.exc(payload, value);
     }