package com.xy.service; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.xy.config.DeviceThreadPoolConfig; import com.xy.consts.MercConstant; import com.xy.device.EnumDeviceOnlineStatus; import com.xy.dto.*; import com.xy.dto.be.MercDto; import com.xy.dto.be.MercUserDeviceDto; import com.xy.entity.*; import com.xy.enums.ChannelType; import com.xy.enums.MsgConfigId; import com.xy.enums.MsgType; import com.xy.enums.SmsSceneEnum; import com.xy.event.DeviceEvent; import com.xy.service.be.MercFeignService; import com.xy.utils.*; import com.xy.utils.enums.DeviceErrorRecordTypesEnum; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service @AllArgsConstructor @Api(tags = "设备消费者") public class DeviceMqttConsumerImpl implements DeviceMqttConsumer { private AliSmsService aliSmsService; private MsgSendApiService msgSendApiService; private DeviceInfoServiceImpl deviceInfoService; private MercFeignService mercFeignService; private UserInfoService userInfoService; private MsgSysMonitorNoticeService msgSysMonitorNoticeService; private DeviceFaultLogServiceImpl deviceFaultLogService; private DeviceFaultInfoServiceImpl deviceFaultInfoService; private MercSmsSubscriptionService mercSmsSubscriptionService; private MercSmsPackageService mercSmsService; @Override @ApiOperation("设备在线") @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL) public void connected(DeviceMqttDto.RequestParams requestParams) { Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class); netWork(requestParams, netState); } @Override @ApiOperation("设备离线") @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL) public void disconnect(DeviceMqttDto.RequestParams requestParams) { Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class); netWork(requestParams, netState); } @Override @ApiOperation("设备消息推送") public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) { log.info("设备消息推送MQTT:{}", JSONUtil.toJsonPrettyStr(requestParams)); DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class); if (deviceEventMsg != null) { Long deviceId = deviceEventMsg.getDeviceId(); Long mercId = deviceEventMsg.getMercId(); MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId))); String mercCode = merc.getMercCode(); LocalDateTime createTime = deviceEventMsg.getCreateTime(); String msg = deviceEventMsg.getMsg(); String code = deviceEventMsg.getCode(); SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code); String expand = sysCodeConfigureRedis.getExpand(); if (StrUtil.isEmpty(expand)) { return; } String paterCode = sysCodeConfigureRedis.getPaterCode(); SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode); String eventType = parentCodeConfig.getMsg(); String title = sysCodeConfigureRedis.getTitle(); //是否展示给商户 boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand); //是否推送给商户 boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand); //是否推送时间段 boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand); Long configId = MsgConfigId.DEVICE_EXCEPTION.getId(); if (MercAuthUtils.sendMsgToCM(mercCode)) { configId = MsgConfigId.CM_DEVICE_EXCEPTION.getId(); } DeviceInfo deviceInfo = deviceInfoService.getById(deviceId); MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId))); List bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId))); if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) { List bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class); List sendList = new ArrayList<>(); /** * {{first.DATA}} * 设备名称:{{keyword1.DATA}} * 告警内容:{{keyword2.DATA}} * 发生时间:{{keyword3.DATA}} * {{remark.DATA}} * */ String deviceName = deviceInfo.getDeviceName(); if (StrUtil.isEmpty(deviceName)) { deviceName = deviceId + ""; } else { deviceName = deviceName + "(" + deviceId + ")"; } Long userInfoId = merc.getUserInfoId(); List userInfoIdList = new ArrayList<>(); userInfoIdList.add(userInfoId); List mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers( new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId))); if (CollUtil.isNotEmpty(mercUserDevices)) { mercUserDevices.forEach(mud -> { Long deviceIdUser = mud.getDeviceId(); if (Objects.equals(deviceId, deviceIdUser)) { userInfoIdList.add(mud.getUserId()); } }); } StringBuilder sbMsg = StrUtil.builder().append("设备") .append("【") .append(deviceName) .append("】") .append("发生故障:").append(msg).append(",发生时间:") .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); //查询userinfo表的mpOpenId List userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList))); //配置了展示给商户才生成数据 if (expShow2Merc) { userInfoList.forEach(u -> { String name = u.getName(); //系统消息 MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save(); msgSys.setUserId(u.getAuthorizeUserId()); msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString()) .setPubTime(LocalDateTime.now()).setPubState(2) .setSendTargets(name) .setPriority(msgConfig.getPriority()) .setTitle(eventType + "-" + title) .setEventCode(code) .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription()); msgSysMonitorNoticeService.save(msgSys); }); } for (MsgConfigTestDto.BizData b : bizDataList) { String channelType = b.getChannelType(); if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) { //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID Set mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid())) .map(UserInfoDto.Vo::getMpOpenid) .collect(Collectors.toSet()); if (CollUtil.isEmpty(mpOpenIds)) { continue; } //微信公众号 //扩展参数 MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam(); if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) { String propose = sysCodeConfigureRedis.getPropose(); if (StrUtil.isNotEmpty(propose) && propose.contains("http")) { bizExtraParam.setUrl(propose); } } Map params = MapUtil.newHashMap(); if (MercAuthUtils.sendMsgToCM(mercCode)) { //设备编号:{{character_string2.DATA}} 故障类型:{{thing6.DATA}} 报障时间:{{time8.DATA}} params.put("character_string2", deviceId); params.put("thing6", msg); params.put("time8", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); } else { params.put("keyword1", deviceName); params.put("keyword2", msg); params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); } //商户管理员发 b.setReceivers(mpOpenIds); b.setTemplateParams(params); b.setExtraParam(bizExtraParam); sendList.add(b); } } //是否可推送条件 if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) { msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList)); } } //短信通知 log.info("设备告警,短信通知处理开始。。。"); deviceSmsPushMsg(merc,deviceInfo,createTime,code); } } /** * 商户设备异常短信通知 * @param merc * @param deviceInfo * @param createTime * @param eventCode */ public void deviceSmsPushMsg(MercDto.Vo merc,DeviceInfo deviceInfo,LocalDateTime createTime,String eventCode) { Long deviceId = deviceInfo.getDeviceId(); String deviceName = deviceInfo.getDeviceName(); if (StrUtil.isEmpty(deviceName)) { deviceName = deviceId + ""; } else { deviceName = deviceName + "(" + deviceId + ")"; } String exceptionType=""; //商家是否订阅 if(DeviceErrorRecordTypesEnum.T.getCode().equals(eventCode)) { exceptionType = "温度"; }else if(DeviceErrorRecordTypesEnum.NET.getCode().equals(eventCode)){ exceptionType="网络"; }else if(DeviceErrorRecordTypesEnum.DOOR_LOCK.getCode().equals(eventCode)){ exceptionType="门锁"; }else if(DeviceErrorRecordTypesEnum.LIGHT.getCode().equals(eventCode)){ exceptionType="灯光"; }else if(DeviceErrorRecordTypesEnum.CAMERA.getCode().equals(eventCode)){ exceptionType="摄像头"; }else if(DeviceErrorRecordTypesEnum.COMPRESSOR.getCode().equals(eventCode)){ exceptionType="压缩机"; }else{ log.info("设备告警短信通知:异常类型未配置,eventCode{}",eventCode); } //温度异常 Boolean b = R.feignCheckData(mercSmsSubscriptionService.isSubscribedByCode(new MercSmsSubscriptionDto.Vo().setMercId(merc.getId()).setEventCode(eventCode))); log.info("设备告警,商户是否订阅短信通知:{},eventCode:{}",b,eventCode); boolean isSubscribed = BooleanUtil.isTrue(b); //商户是否订阅短信通知 if(isSubscribed&&StrUtil.isNotEmpty(exceptionType)){ UserInfoDto.Vo userInfo = R.feignCheckData(userInfoService.obj(new UserInfoDto.Vo().setUserId(merc.getUserInfoId()))); if(userInfo!= null && StrUtil.isNotEmpty(userInfo.getTel())){ String tel = userInfo.getTel(); //短信推送 设备告警:设备${device_id}于${time}发生${exception_type}异常。请登录后台查看详情并及时处理。 Long configId = MsgConfigId.SMS_NOTIFY_DEVICE.getId(); String formatDate = LocalDateTimeUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN); Map templateParams = MapUtil.newHashMap(); templateParams.put("device_id", deviceName); templateParams.put("time", formatDate); templateParams.put("exception_type", exceptionType); log.info("商户设备告警短信通知:{}", JSONUtil.toJsonPrettyStr(templateParams)); // 扣除商户短信资源包 R deduct = mercSmsService.deduct(new MercSmsPackageDto.Deduct() .setMercId(merc.getId()) .setMsg("设备告警短信通知,设备号:" + deviceId) ); if (deduct.getCode() == R.Enum.SUCCESS.getCode()) { R.feignCheck(aliSmsService.sendMsgCommon(new SmsSendCommonDTO().setSignName("").setMsgConfigId(configId).setMobile(tel).setTemplateParams(templateParams).setScene(SmsSceneEnum.B_DEVICE_ALARM_NOTIFY.getScene()))); }else{ log.info("设备告警短信通知#商户{}短信资源包不足,无法发送短信通知:{}",merc.getId()); } } } } /** * 设备故障 * * @param requestParams */ @Override public void deviceFault(DeviceMqttDto.RequestParams requestParams) { DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class); // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData)); if (mqttData != null) { DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg(); Long mercId = deviceEventMsg.getMercId(); String code = deviceEventMsg.getCode(); Long deviceId = deviceEventMsg.getDeviceId(); String msg = deviceEventMsg.getMsg(); //加入故障日志 DeviceFaultLog log = new DeviceFaultLog() .setMercId(mercId) .setDescribes(msg) .setEventCode(code) .setEventTime(deviceEventMsg.getCreateTime()) .setDeviceId(deviceId); deviceFaultLogService.save(log); List configList = mqttData.getConfigList(); // 按故障级别排序 降序 List configSortedList = configList.stream() .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed()) .collect(Collectors.toList()); for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) { Integer eventNum = config.getEventNum(); Long minuteNum = config.getMinuteNum(); Integer faultLevel = config.getFaultLevel(); Long monitorId = config.getMonitorId(); Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale()); // 获取当前时间 Date now = DateUtil.date(); // 计算最近n分钟的开始时间 Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue()); //统计同类事件累计次数 Long count = deviceFaultLogService.count(Wrappers.lambdaQuery() .eq(DeviceFaultLog::getDeviceId, deviceId) .eq(DeviceFaultLog::getMercId, mercId) .eq(DeviceFaultLog::getEventCode, code) .ge(DeviceFaultLog::getEventTime, minutesAgo) ); //达到阈值次数 if (count != null && count.intValue() >= eventNum) { //故障表入库 //是否已存在 未解决的 DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.lambdaQuery() .eq(DeviceFaultInfo::getDeviceId, deviceId) .eq(DeviceFaultInfo::getMercId, mercId) .eq(DeviceFaultInfo::getEventCode, code) .eq(DeviceFaultInfo::getState, false) ); if (deviceFaultInfo != null) { Integer faultLevel1 = deviceFaultInfo.getFaultLevel(); if (faultLevel1.intValue() < faultLevel) { //小于,当前故障级别,更新 deviceFaultInfo.setFaultLevel(faultLevel); deviceFaultInfo.setToAfterSale(toAfterSale); deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel)); } //累加次数 deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId); deviceFaultInfoService.updateById(deviceFaultInfo); } else { deviceFaultInfo = new DeviceFaultInfo() .setMercId(mercId) .setDeviceId(deviceId) .setDescribes(msg) .setFaultLevel(faultLevel) .setEventCode(code) .setEventNum(count.intValue()) .setMonitorId(monitorId) .setToAfterSale(toAfterSale); deviceFaultInfoService.save(deviceFaultInfo); deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel)); } //给售后推送故障信息 if (toAfterSale.booleanValue()) { deviceFaultPushMsg(deviceEventMsg); } //已按最大故障级别排序,所以满足条件就退出循环 break; } } } } /** * 故障推送给所有售后 * * @param deviceEventMsg */ public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) { if (deviceEventMsg != null) { Long deviceId = deviceEventMsg.getDeviceId(); Long mercId = deviceEventMsg.getMercId(); LocalDateTime createTime = deviceEventMsg.getCreateTime(); String msg = deviceEventMsg.getMsg(); String code = deviceEventMsg.getCode(); Long configId = MsgConfigId.DEVICE_EXCEPTION.getId(); MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId))); List bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId))); if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) { List bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class); List sendList = new ArrayList<>(); DeviceInfo deviceInfo = deviceInfoService.getById(deviceId); String deviceName = deviceInfo.getDeviceName(); if (StrUtil.isEmpty(deviceName)) { deviceName = deviceId + ""; } else { deviceName = deviceName + "(" + deviceId + ")"; } // 获取售后及售后创建的角色用户 UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers()); List mpOpenIds = saleBackUsersVO.getMpOpenIds(); //开门超时事件增加商户推送 ERR0020 if ("ERR0020".equals(code)) { MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId))); Long userInfoId = merc.getUserInfoId(); List userInfoIdList = new ArrayList<>(); userInfoIdList.add(userInfoId); List mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers( new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId))); if (CollUtil.isNotEmpty(mercUserDevices)) { mercUserDevices.forEach(mud -> { Long deviceIdUser = mud.getDeviceId(); if (deviceId == deviceIdUser) { userInfoIdList.add(mud.getUserId()); } }); } List userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList))); if (CollUtil.isNotEmpty(userInfoList)) { Set mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid())) .map(UserInfoDto.Vo::getMpOpenid) .collect(Collectors.toSet()); mpOpenIds.addAll(mercMpOpenIds); } } if (CollUtil.isNotEmpty(mpOpenIds)) { Set mpIds = new HashSet<>(mpOpenIds); for (MsgConfigTestDto.BizData b : bizDataList) { String channelType = b.getChannelType(); if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) { //微信公众号 //扩展参数 MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam(); Map params = MapUtil.newHashMap(); params.put("keyword1", deviceName); params.put("keyword2", msg); params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); //商户管理员发 b.setReceivers(mpIds); b.setTemplateParams(params); b.setExtraParam(bizExtraParam); sendList.add(b); } } } //是否可推送条件 if (CollUtil.isNotEmpty(sendList)) { msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList)); } } } } /** * 设备联网状态 * * @param requestParams * @param netState */ private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) { JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData()); String clientid = jsonObject.getStr("clientid"); Long deviceId; try { deviceId = Long.valueOf(clientid); } catch (Exception e) { return; } //发布事件 DeviceEvent.NetWorkEvent netWorkEvent = new DeviceEvent.NetWorkEvent() .setNetState(netState); netWorkEvent.setValue(deviceId); EventUtils.push(() -> netWorkEvent); } }