package com.xy.service; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.xy.config.DeviceThreadPoolConfig; import com.xy.device.EnumDeviceOnlineStatus; import com.xy.dto.*; import com.xy.dto.be.MercDto; import com.xy.dto.be.MercUserDeviceDto; import com.xy.entity.*; import com.xy.enums.ChannelType; import com.xy.enums.MsgConfigId; import com.xy.enums.MsgType; import com.xy.event.DeviceEvent; import com.xy.service.be.MercFeignService; import com.xy.utils.*; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service @AllArgsConstructor @Api(tags = "设备消费者") public class DeviceMqttConsumerImpl implements DeviceMqttConsumer { private MsgSendApiService msgSendApiService; private DeviceInfoServiceImpl deviceInfoService; private MercFeignService mercFeignService; private UserInfoService userInfoService; private MsgSysMonitorNoticeService msgSysMonitorNoticeService; private DeviceFaultLogServiceImpl deviceFaultLogService; private DeviceFaultInfoServiceImpl deviceFaultInfoService; @Override @ApiOperation("设备在线") @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL) public void connected(DeviceMqttDto.RequestParams requestParams) { Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class); netWork(requestParams, netState); } @Override @ApiOperation("设备离线") @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL) public void disconnect(DeviceMqttDto.RequestParams requestParams) { Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class); netWork(requestParams, netState); } @Override @ApiOperation("设备消息推送") public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) { DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class); if (deviceEventMsg != null) { Long deviceId = deviceEventMsg.getDeviceId(); Long mercId = deviceEventMsg.getMercId(); MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId))); String mercCode = merc.getMercCode(); LocalDateTime createTime = deviceEventMsg.getCreateTime(); String msg = deviceEventMsg.getMsg(); String code = deviceEventMsg.getCode(); SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code); String expand = sysCodeConfigureRedis.getExpand(); if (StrUtil.isEmpty(expand)) { return; } String paterCode = sysCodeConfigureRedis.getPaterCode(); SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode); String eventType = parentCodeConfig.getMsg(); String title = sysCodeConfigureRedis.getTitle(); //是否展示给商户 boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand); //是否推送给商户 boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand); //是否推送时间段 boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand); Long configId = MsgConfigId.DEVICE_EXCEPTION.getId(); if (MercAuthUtils.sendMsgToCM(mercCode)) { configId = MsgConfigId.CM_DEVICE_EXCEPTION.getId(); } MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId))); List bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId))); if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) { List bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class); List sendList = new ArrayList<>(); /** * {{first.DATA}} * 设备名称:{{keyword1.DATA}} * 告警内容:{{keyword2.DATA}} * 发生时间:{{keyword3.DATA}} * {{remark.DATA}} * */ DeviceInfo deviceInfo = deviceInfoService.getById(deviceId); String deviceName = deviceInfo.getDeviceName(); if (StrUtil.isEmpty(deviceName)) { deviceName = deviceId + ""; } else { deviceName = deviceName + "(" + deviceId + ")"; } Long userInfoId = merc.getUserInfoId(); List userInfoIdList = new ArrayList<>(); userInfoIdList.add(userInfoId); List mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers( new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId))); if (CollUtil.isNotEmpty(mercUserDevices)) { mercUserDevices.forEach(mud -> { Long deviceIdUser = mud.getDeviceId(); if (deviceId == deviceIdUser) { userInfoIdList.add(mud.getUserId()); } }); } StringBuilder sbMsg = StrUtil.builder().append("设备") .append("【") .append(deviceName) .append("】") .append("发生故障:").append(msg).append(",发生时间:") .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); //查询userinfo表的mpOpenId List userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList))); //配置了展示给商户才生成数据 if (expShow2Merc) { userInfoList.forEach(u -> { String name = u.getName(); //系统消息 MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save(); msgSys.setUserId(u.getAuthorizeUserId()); msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString()) .setPubTime(LocalDateTime.now()).setPubState(2) .setSendTargets(name) .setPriority(msgConfig.getPriority()) .setTitle(eventType + "-" + title) .setEventCode(code) .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription()); msgSysMonitorNoticeService.save(msgSys); }); } for (MsgConfigTestDto.BizData b : bizDataList) { String channelType = b.getChannelType(); if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) { //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID Set mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid())) .map(UserInfoDto.Vo::getMpOpenid) .collect(Collectors.toSet()); if (CollUtil.isEmpty(mpOpenIds)) { continue; } //微信公众号 //扩展参数 MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam(); if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) { String propose = sysCodeConfigureRedis.getPropose(); if (StrUtil.isNotEmpty(propose) && propose.contains("http")) { bizExtraParam.setUrl(propose); } } Map params = MapUtil.newHashMap(); if (MercAuthUtils.sendMsgToCM(mercCode)) { //设备编号:{{character_string2.DATA}} 故障类型:{{thing6.DATA}} 报障时间:{{time8.DATA}} params.put("character_string2", deviceId); params.put("thing6", msg); params.put("time8", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); } else { params.put("keyword1", deviceName); params.put("keyword2", msg); params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); } //商户管理员发 b.setReceivers(mpOpenIds); b.setTemplateParams(params); b.setExtraParam(bizExtraParam); sendList.add(b); } } //是否可推送条件 if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) { msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList)); } } } } /** * 设备故障 * * @param requestParams */ @Override public void deviceFault(DeviceMqttDto.RequestParams requestParams) { DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class); // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData)); if (mqttData != null) { DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg(); Long mercId = deviceEventMsg.getMercId(); String code = deviceEventMsg.getCode(); Long deviceId = deviceEventMsg.getDeviceId(); String msg = deviceEventMsg.getMsg(); //加入故障日志 DeviceFaultLog log = new DeviceFaultLog() .setMercId(mercId) .setDescribes(msg) .setEventCode(code) .setEventTime(deviceEventMsg.getCreateTime()) .setDeviceId(deviceId); deviceFaultLogService.save(log); List configList = mqttData.getConfigList(); // 按故障级别排序 降序 List configSortedList = configList.stream() .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed()) .collect(Collectors.toList()); for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) { Integer eventNum = config.getEventNum(); Long minuteNum = config.getMinuteNum(); Integer faultLevel = config.getFaultLevel(); Long monitorId = config.getMonitorId(); Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale()); // 获取当前时间 Date now = DateUtil.date(); // 计算最近n分钟的开始时间 Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue()); //统计同类事件累计次数 Long count = deviceFaultLogService.count(Wrappers.lambdaQuery() .eq(DeviceFaultLog::getDeviceId, deviceId) .eq(DeviceFaultLog::getMercId, mercId) .eq(DeviceFaultLog::getEventCode, code) .ge(DeviceFaultLog::getEventTime, minutesAgo) ); //达到阈值次数 if (count != null && count.intValue() >= eventNum) { //故障表入库 //是否已存在 未解决的 DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.lambdaQuery() .eq(DeviceFaultInfo::getDeviceId, deviceId) .eq(DeviceFaultInfo::getMercId, mercId) .eq(DeviceFaultInfo::getEventCode, code) .eq(DeviceFaultInfo::getState, false) ); if (deviceFaultInfo != null) { Integer faultLevel1 = deviceFaultInfo.getFaultLevel(); if (faultLevel1.intValue() < faultLevel) { //小于,当前故障级别,更新 deviceFaultInfo.setFaultLevel(faultLevel); deviceFaultInfo.setToAfterSale(toAfterSale); deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel)); } //累加次数 deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId); deviceFaultInfoService.updateById(deviceFaultInfo); } else { deviceFaultInfo = new DeviceFaultInfo() .setMercId(mercId) .setDeviceId(deviceId) .setDescribes(msg) .setFaultLevel(faultLevel) .setEventCode(code) .setEventNum(count.intValue()) .setMonitorId(monitorId) .setToAfterSale(toAfterSale); deviceFaultInfoService.save(deviceFaultInfo); deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel)); } //给售后推送故障信息 if (toAfterSale.booleanValue()) { deviceFaultPushMsg(deviceEventMsg); } //已按最大故障级别排序,所以满足条件就退出循环 break; } } } } /** * 故障推送给所有售后 * * @param deviceEventMsg */ public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) { if (deviceEventMsg != null) { Long deviceId = deviceEventMsg.getDeviceId(); Long mercId = deviceEventMsg.getMercId(); LocalDateTime createTime = deviceEventMsg.getCreateTime(); String msg = deviceEventMsg.getMsg(); String code = deviceEventMsg.getCode(); Long configId = MsgConfigId.DEVICE_EXCEPTION.getId(); MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId))); List bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId))); if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) { List bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class); List sendList = new ArrayList<>(); DeviceInfo deviceInfo = deviceInfoService.getById(deviceId); String deviceName = deviceInfo.getDeviceName(); if (StrUtil.isEmpty(deviceName)) { deviceName = deviceId + ""; } else { deviceName = deviceName + "(" + deviceId + ")"; } // 获取售后及售后创建的角色用户 UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers()); List mpOpenIds = saleBackUsersVO.getMpOpenIds(); //开门超时事件增加商户推送 ERR0020 if ("ERR0020".equals(code)) { MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId))); Long userInfoId = merc.getUserInfoId(); List userInfoIdList = new ArrayList<>(); userInfoIdList.add(userInfoId); List mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers( new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId))); if (CollUtil.isNotEmpty(mercUserDevices)) { mercUserDevices.forEach(mud -> { Long deviceIdUser = mud.getDeviceId(); if (deviceId == deviceIdUser) { userInfoIdList.add(mud.getUserId()); } }); } List userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList))); if (CollUtil.isNotEmpty(userInfoList)) { Set mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid())) .map(UserInfoDto.Vo::getMpOpenid) .collect(Collectors.toSet()); mpOpenIds.addAll(mercMpOpenIds); } } if (CollUtil.isNotEmpty(mpOpenIds)) { Set mpIds = new HashSet<>(mpOpenIds); for (MsgConfigTestDto.BizData b : bizDataList) { String channelType = b.getChannelType(); if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) { //微信公众号 //扩展参数 MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam(); Map params = MapUtil.newHashMap(); params.put("keyword1", deviceName); params.put("keyword2", msg); params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); //商户管理员发 b.setReceivers(mpIds); b.setTemplateParams(params); b.setExtraParam(bizExtraParam); sendList.add(b); } } } //是否可推送条件 if (CollUtil.isNotEmpty(sendList)) { msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList)); } } } } /** * 设备联网状态 * * @param requestParams * @param netState */ private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) { JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData()); String clientid = jsonObject.getStr("clientid"); Long deviceId; try { deviceId = Long.valueOf(clientid); } catch (Exception e) { return; } //发布事件 DeviceEvent.NetWorkEvent netWorkEvent = new DeviceEvent.NetWorkEvent() .setNetState(netState); netWorkEvent.setValue(deviceId); EventUtils.push(() -> netWorkEvent); } }