李进 1 год назад
Родитель
Сommit
458eba959f

+ 64 - 33
device-api-service/src/main/java/com/xy/consumer/device/push/msg/DevicePushMsgConsumer.java → device-api-service/src/main/java/com/xy/service/DeviceMqttConsumerImpl.java

@@ -1,4 +1,4 @@
-package com.xy.consumer.device.push.msg;
+package com.xy.service;
 
 import cn.hutool.core.bean.BeanUtil;
 import cn.hutool.core.collection.CollUtil;
@@ -7,12 +7,11 @@ import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.map.MapUtil;
 import cn.hutool.core.util.ObjectUtil;
 import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
-import com.xy.consumer.MqttConsumer;
-import com.xy.dto.MsgConfigDto;
-import com.xy.dto.MsgConfigTestDto;
-import com.xy.dto.MsgSysMonitorNoticeDto;
-import com.xy.dto.UserInfoDto;
+import com.xy.config.DeviceThreadPoolConfig;
+import com.xy.device.EnumDeviceOnlineStatus;
+import com.xy.dto.*;
 import com.xy.dto.be.MercDto;
 import com.xy.dto.be.MercUserDeviceDto;
 import com.xy.entity.DeviceEventMsg;
@@ -20,15 +19,14 @@ import com.xy.entity.DeviceInfo;
 import com.xy.entity.SysCodeConfigureRedis;
 import com.xy.enums.ChannelType;
 import com.xy.enums.MsgType;
-import com.xy.service.DeviceInfoServiceImpl;
-import com.xy.service.MsgSendApiService;
-import com.xy.service.MsgSysMonitorNoticeService;
-import com.xy.service.UserInfoService;
 import com.xy.service.be.MercFeignService;
 import com.xy.utils.R;
 import com.xy.utils.SysCodeConfigureUtils;
+import com.xy.utils.SysDictUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
 import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
@@ -38,29 +36,45 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-
-/**
- * 设备消息推送消费者
- *
- * @author 谭斌
- * @date 2023/06/06
- */
-@Slf4j
 @Service
 @AllArgsConstructor
-public class DevicePushMsgConsumer implements MqttConsumer {
+@Api(tags = "设备消费者")
+public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
+
+    private DeviceStatusServiceImpl deviceStatusService;
+
+    private DeviceNetRecordServiceImpl deviceNetRecordService;
 
     private MsgSendApiService msgSendApiService;
+
     private DeviceInfoServiceImpl deviceInfoService;
+
     private MercFeignService mercFeignService;
+
     private UserInfoService userInfoService;
+
     private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
 
+    @Override
+    @ApiOperation("设备在线")
+    @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
+    public void connected(DeviceMqttDto.RequestParams requestParams) {
+        Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
+        netWork(requestParams, netState);
+    }
+
+    @Override
+    @ApiOperation("设备离线")
+    @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
+    public void disconnect(DeviceMqttDto.RequestParams requestParams) {
+        Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
+        netWork(requestParams, netState);
+    }
 
     @Override
-    public boolean message(String topic, String payload) {
-        log.info("-----设备故障消息推送-----:{}", payload);
-        DeviceEventMsg deviceEventMsg = JSONUtil.toBean(payload, DeviceEventMsg.class);
+    @ApiOperation("设备消息推送")
+    public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
+        DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
         if (deviceEventMsg != null) {
             Long deviceId = deviceEventMsg.getDeviceId();
             Long mercId = deviceEventMsg.getMercId();
@@ -70,7 +84,7 @@ public class DevicePushMsgConsumer implements MqttConsumer {
             SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
             String expand = sysCodeConfigureRedis.getExpand();
             if (StrUtil.isEmpty(expand)) {
-                return true;
+                return;
             }
             String paterCode = sysCodeConfigureRedis.getPaterCode();
             SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
@@ -82,7 +96,6 @@ public class DevicePushMsgConsumer implements MqttConsumer {
             boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
             //是否推送时间段
             boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
-
             Long configId = 12L;
             MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
             List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
@@ -104,7 +117,6 @@ public class DevicePushMsgConsumer implements MqttConsumer {
                 } else {
                     deviceName = deviceName + "(" + deviceId + ")";
                 }
-
                 MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
                 Long userInfoId = merc.getUserInfoId();
                 List<Long> userInfoIdList = new ArrayList<>();
@@ -144,21 +156,16 @@ public class DevicePushMsgConsumer implements MqttConsumer {
                         msgSysMonitorNoticeService.save(msgSys);
                     });
                 }
-
-
                 for (MsgConfigTestDto.BizData b : bizDataList) {
                     String channelType = b.getChannelType();
                     if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
                         //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息,  查询微信公众号openID
-
                         Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
                                 .map(UserInfoDto.Vo::getMpOpenid)
                                 .collect(Collectors.toSet());
-
                         if (CollUtil.isEmpty(mpOpenIds)) {
                             continue;
                         }
-
                         //微信公众号
                         //扩展参数
                         MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
@@ -184,10 +191,34 @@ public class DevicePushMsgConsumer implements MqttConsumer {
                 if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
                     msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
                 }
-
             }
         }
-        return true;
+    }
+
+    /**
+     * 设备联网状态
+     *
+     * @param requestParams
+     * @param netState
+     */
+    private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
+        JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
+        String clientid = jsonObject.getStr("clientid");
+        Long deviceId;
+        try {
+            deviceId = Long.valueOf(clientid);
+        } catch (Exception e) {
+            return;
+        }
+        //上报状态
+        DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
+        up.setNetState(netState);
+        deviceStatusService.up(up).getCode();
+        //添加联网记录
+        DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
+                .setDeviceId(deviceId);
+        save.setNetStatus(netState);
+        deviceNetRecordService.save(save);
     }
 
 }