|
- package com.xy.service;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.collection.CollUtil;
- import cn.hutool.core.date.DatePattern;
- import cn.hutool.core.date.DateUtil;
- import cn.hutool.core.date.LocalDateTimeUtil;
- import cn.hutool.core.map.MapUtil;
- import cn.hutool.core.util.BooleanUtil;
- import cn.hutool.core.util.ObjectUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.xy.config.DeviceThreadPoolConfig;
- import com.xy.consts.MercConstant;
- import com.xy.device.EnumDeviceOnlineStatus;
- import com.xy.dto.*;
- import com.xy.dto.be.MercDto;
- import com.xy.dto.be.MercUserDeviceDto;
- import com.xy.entity.*;
- import com.xy.enums.ChannelType;
- import com.xy.enums.MsgConfigId;
- import com.xy.enums.MsgType;
- import com.xy.enums.SmsSceneEnum;
- import com.xy.event.DeviceEvent;
- import com.xy.service.be.MercFeignService;
- import com.xy.utils.*;
- import com.xy.utils.enums.DeviceErrorRecordTypesEnum;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import java.time.LocalDateTime;
- import java.util.*;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- @AllArgsConstructor
- @Api(tags = "设备消费者")
- public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
- private AliSmsService aliSmsService;
- private MsgSendApiService msgSendApiService;
- private DeviceInfoServiceImpl deviceInfoService;
- private MercFeignService mercFeignService;
- private UserInfoService userInfoService;
- private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
- private DeviceFaultLogServiceImpl deviceFaultLogService;
- private DeviceFaultInfoServiceImpl deviceFaultInfoService;
- private MercSmsSubscriptionService mercSmsSubscriptionService;
- private MercSmsPackageService mercSmsService;
- @Override
- @ApiOperation("设备在线")
- @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
- public void connected(DeviceMqttDto.RequestParams requestParams) {
- Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
- netWork(requestParams, netState);
- }
- @Override
- @ApiOperation("设备离线")
- @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
- public void disconnect(DeviceMqttDto.RequestParams requestParams) {
- Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
- netWork(requestParams, netState);
- }
- @Override
- @ApiOperation("设备消息推送")
- public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
- log.info("设备消息推送MQTT:{}", JSONUtil.toJsonPrettyStr(requestParams));
- DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
- if (deviceEventMsg != null) {
- Long deviceId = deviceEventMsg.getDeviceId();
- Long mercId = deviceEventMsg.getMercId();
- MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
- String mercCode = merc.getMercCode();
- LocalDateTime createTime = deviceEventMsg.getCreateTime();
- String msg = deviceEventMsg.getMsg();
- String code = deviceEventMsg.getCode();
- SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
- String expand = sysCodeConfigureRedis.getExpand();
- if (StrUtil.isEmpty(expand)) {
- return;
- }
- String paterCode = sysCodeConfigureRedis.getPaterCode();
- SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
- String eventType = parentCodeConfig.getMsg();
- String title = sysCodeConfigureRedis.getTitle();
- //是否展示给商户
- boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
- //是否推送给商户
- boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
- //是否推送时间段
- boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
- Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
- if (MercAuthUtils.sendMsgToCM(mercCode)) {
- configId = MsgConfigId.CM_DEVICE_EXCEPTION.getId();
- }
- DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
- MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
- List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
- if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
- List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
- List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
- /**
- * {{first.DATA}}
- * 设备名称:{{keyword1.DATA}}
- * 告警内容:{{keyword2.DATA}}
- * 发生时间:{{keyword3.DATA}}
- * {{remark.DATA}}
- *
- */
- String deviceName = deviceInfo.getDeviceName();
- if (StrUtil.isEmpty(deviceName)) {
- deviceName = deviceId + "";
- } else {
- deviceName = deviceName + "(" + deviceId + ")";
- }
- Long userInfoId = merc.getUserInfoId();
- List<Long> userInfoIdList = new ArrayList<>();
- userInfoIdList.add(userInfoId);
- List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
- new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
- if (CollUtil.isNotEmpty(mercUserDevices)) {
- mercUserDevices.forEach(mud -> {
- Long deviceIdUser = mud.getDeviceId();
- if (Objects.equals(deviceId, deviceIdUser)) {
- userInfoIdList.add(mud.getUserId());
- }
- });
- }
- StringBuilder sbMsg = StrUtil.builder().append("设备")
- .append("【")
- .append(deviceName)
- .append("】")
- .append("发生故障:").append(msg).append(",发生时间:")
- .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
- //查询userinfo表的mpOpenId
- List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
- //配置了展示给商户才生成数据
- if (expShow2Merc) {
- userInfoList.forEach(u -> {
- String name = u.getName();
- //系统消息
- MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
- msgSys.setUserId(u.getAuthorizeUserId());
- msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
- .setPubTime(LocalDateTime.now()).setPubState(2)
- .setSendTargets(name)
- .setPriority(msgConfig.getPriority())
- .setTitle(eventType + "-" + title)
- .setEventCode(code)
- .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
- msgSysMonitorNoticeService.save(msgSys);
- });
- }
- for (MsgConfigTestDto.BizData b : bizDataList) {
- String channelType = b.getChannelType();
- if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
- //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
- Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
- .map(UserInfoDto.Vo::getMpOpenid)
- .collect(Collectors.toSet());
- if (CollUtil.isEmpty(mpOpenIds)) {
- continue;
- }
- //微信公众号
- //扩展参数
- MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
- if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
- String propose = sysCodeConfigureRedis.getPropose();
- if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
- bizExtraParam.setUrl(propose);
- }
- }
- Map<String, Object> params = MapUtil.newHashMap();
- if (MercAuthUtils.sendMsgToCM(mercCode)) {
- //设备编号:{{character_string2.DATA}} 故障类型:{{thing6.DATA}} 报障时间:{{time8.DATA}}
- params.put("character_string2", deviceId);
- params.put("thing6", msg);
- params.put("time8", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
- } else {
- params.put("keyword1", deviceName);
- params.put("keyword2", msg);
- params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
- }
- //商户管理员发
- b.setReceivers(mpOpenIds);
- b.setTemplateParams(params);
- b.setExtraParam(bizExtraParam);
- sendList.add(b);
- }
- }
- //是否可推送条件
- if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
- msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
- }
- }
- //短信通知
- log.info("设备告警,短信通知处理开始。。。");
- deviceSmsPushMsg(merc,deviceInfo,createTime,code);
- }
- }
- /**
- * 商户设备异常短信通知
- * @param merc
- * @param deviceInfo
- * @param createTime
- * @param eventCode
- */
- public void deviceSmsPushMsg(MercDto.Vo merc,DeviceInfo deviceInfo,LocalDateTime createTime,String eventCode) {
- Long deviceId = deviceInfo.getDeviceId();
- String deviceName = deviceInfo.getDeviceName();
- if (StrUtil.isEmpty(deviceName)) {
- deviceName = deviceId + "";
- } else {
- deviceName = deviceName + "(" + deviceId + ")";
- }
- String exceptionType="";
- //商家是否订阅
- if(DeviceErrorRecordTypesEnum.T.getCode().equals(eventCode)) {
- exceptionType = "温度";
- }else if(DeviceErrorRecordTypesEnum.NET.getCode().equals(eventCode)){
- exceptionType="网络";
- }else if(DeviceErrorRecordTypesEnum.DOOR_LOCK.getCode().equals(eventCode)){
- exceptionType="门锁";
- }else if(DeviceErrorRecordTypesEnum.LIGHT.getCode().equals(eventCode)){
- exceptionType="灯光";
- }else if(DeviceErrorRecordTypesEnum.CAMERA.getCode().equals(eventCode)){
- exceptionType="摄像头";
- }else if(DeviceErrorRecordTypesEnum.COMPRESSOR.getCode().equals(eventCode)){
- exceptionType="压缩机";
- }else{
- log.info("设备告警短信通知:异常类型未配置,eventCode{}",eventCode);
- }
- //温度异常
- Boolean b = R.feignCheckData(mercSmsSubscriptionService.isSubscribedByCode(new MercSmsSubscriptionDto.Vo().setMercId(merc.getId()).setEventCode(eventCode)));
- log.info("设备告警,商户是否订阅短信通知:{},eventCode:{}",b,eventCode);
- boolean isSubscribed = BooleanUtil.isTrue(b);
- //商户是否订阅短信通知
- if(isSubscribed&&StrUtil.isNotEmpty(exceptionType)){
- UserInfoDto.Vo userInfo = R.feignCheckData(userInfoService.obj(new UserInfoDto.Vo().setUserId(merc.getUserInfoId())));
- if(userInfo!= null && StrUtil.isNotEmpty(userInfo.getTel())){
- String tel = userInfo.getTel();
- //短信推送 设备告警:设备${device_id}于${time}发生${exception_type}异常。请登录后台查看详情并及时处理。
- Long configId = MsgConfigId.SMS_NOTIFY_DEVICE.getId();
- String formatDate = LocalDateTimeUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN);
- Map<String, Object> templateParams = MapUtil.newHashMap();
- templateParams.put("device_id", deviceName);
- templateParams.put("time", formatDate);
- templateParams.put("exception_type", exceptionType);
- log.info("商户设备告警短信通知:{}", JSONUtil.toJsonPrettyStr(templateParams));
- // 扣除商户短信资源包
- R deduct = mercSmsService.deduct(new MercSmsPackageDto.Deduct()
- .setMercId(merc.getId())
- .setMsg("设备告警短信通知,设备号:" + deviceId)
- );
- if (deduct.getCode() == R.Enum.SUCCESS.getCode()) {
- R.feignCheck(aliSmsService.sendMsgCommon(new SmsSendCommonDTO().setSignName("").setMsgConfigId(configId).setMobile(tel).setTemplateParams(templateParams).setScene(SmsSceneEnum.B_DEVICE_ALARM_NOTIFY.getScene())));
- }else{
- log.info("设备告警短信通知#商户{}短信资源包不足,无法发送短信通知:{}",merc.getId());
- }
- }
- }
- }
- /**
- * 设备故障
- *
- * @param requestParams
- */
- @Override
- public void deviceFault(DeviceMqttDto.RequestParams requestParams) {
- DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class);
- // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData));
- if (mqttData != null) {
- DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg();
- Long mercId = deviceEventMsg.getMercId();
- String code = deviceEventMsg.getCode();
- Long deviceId = deviceEventMsg.getDeviceId();
- String msg = deviceEventMsg.getMsg();
- //加入故障日志
- DeviceFaultLog log = new DeviceFaultLog()
- .setMercId(mercId)
- .setDescribes(msg)
- .setEventCode(code)
- .setEventTime(deviceEventMsg.getCreateTime())
- .setDeviceId(deviceId);
- deviceFaultLogService.save(log);
- List<DeviceFaultMonitorConfigDto.Vo> configList = mqttData.getConfigList();
- // 按故障级别排序 降序
- List<DeviceFaultMonitorConfigDto.Vo> configSortedList = configList.stream()
- .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed())
- .collect(Collectors.toList());
- for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) {
- Integer eventNum = config.getEventNum();
- Long minuteNum = config.getMinuteNum();
- Integer faultLevel = config.getFaultLevel();
- Long monitorId = config.getMonitorId();
- Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale());
- // 获取当前时间
- Date now = DateUtil.date();
- // 计算最近n分钟的开始时间
- Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue());
- //统计同类事件累计次数
- Long count = deviceFaultLogService.count(Wrappers.<DeviceFaultLog>lambdaQuery()
- .eq(DeviceFaultLog::getDeviceId, deviceId)
- .eq(DeviceFaultLog::getMercId, mercId)
- .eq(DeviceFaultLog::getEventCode, code)
- .ge(DeviceFaultLog::getEventTime, minutesAgo)
- );
- //达到阈值次数
- if (count != null && count.intValue() >= eventNum) {
- //故障表入库
- //是否已存在 未解决的
- DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.<DeviceFaultInfo>lambdaQuery()
- .eq(DeviceFaultInfo::getDeviceId, deviceId)
- .eq(DeviceFaultInfo::getMercId, mercId)
- .eq(DeviceFaultInfo::getEventCode, code)
- .eq(DeviceFaultInfo::getState, false)
- );
- if (deviceFaultInfo != null) {
- Integer faultLevel1 = deviceFaultInfo.getFaultLevel();
- if (faultLevel1.intValue() < faultLevel) {
- //小于,当前故障级别,更新
- deviceFaultInfo.setFaultLevel(faultLevel);
- deviceFaultInfo.setToAfterSale(toAfterSale);
- deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
- }
- //累加次数
- deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId);
- deviceFaultInfoService.updateById(deviceFaultInfo);
- } else {
- deviceFaultInfo = new DeviceFaultInfo()
- .setMercId(mercId)
- .setDeviceId(deviceId)
- .setDescribes(msg)
- .setFaultLevel(faultLevel)
- .setEventCode(code)
- .setEventNum(count.intValue())
- .setMonitorId(monitorId)
- .setToAfterSale(toAfterSale);
- deviceFaultInfoService.save(deviceFaultInfo);
- deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
- }
- //给售后推送故障信息
- if (toAfterSale.booleanValue()) {
- deviceFaultPushMsg(deviceEventMsg);
- }
- //已按最大故障级别排序,所以满足条件就退出循环
- break;
- }
- }
- }
- }
- /**
- * 故障推送给所有售后
- *
- * @param deviceEventMsg
- */
- public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) {
- if (deviceEventMsg != null) {
- Long deviceId = deviceEventMsg.getDeviceId();
- Long mercId = deviceEventMsg.getMercId();
- LocalDateTime createTime = deviceEventMsg.getCreateTime();
- String msg = deviceEventMsg.getMsg();
- String code = deviceEventMsg.getCode();
- Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
- MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
- List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
- if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
- List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
- List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
- DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
- String deviceName = deviceInfo.getDeviceName();
- if (StrUtil.isEmpty(deviceName)) {
- deviceName = deviceId + "";
- } else {
- deviceName = deviceName + "(" + deviceId + ")";
- }
- // 获取售后及售后创建的角色用户
- UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers());
- List<String> mpOpenIds = saleBackUsersVO.getMpOpenIds();
- //开门超时事件增加商户推送 ERR0020
- if ("ERR0020".equals(code)) {
- MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
- Long userInfoId = merc.getUserInfoId();
- List<Long> userInfoIdList = new ArrayList<>();
- userInfoIdList.add(userInfoId);
- List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
- new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
- if (CollUtil.isNotEmpty(mercUserDevices)) {
- mercUserDevices.forEach(mud -> {
- Long deviceIdUser = mud.getDeviceId();
- if (deviceId == deviceIdUser) {
- userInfoIdList.add(mud.getUserId());
- }
- });
- }
- List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
- if (CollUtil.isNotEmpty(userInfoList)) {
- Set<String> mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
- .map(UserInfoDto.Vo::getMpOpenid)
- .collect(Collectors.toSet());
- mpOpenIds.addAll(mercMpOpenIds);
- }
- }
- if (CollUtil.isNotEmpty(mpOpenIds)) {
- Set<String> mpIds = new HashSet<>(mpOpenIds);
- for (MsgConfigTestDto.BizData b : bizDataList) {
- String channelType = b.getChannelType();
- if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
- //微信公众号
- //扩展参数
- MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
- Map<String, Object> params = MapUtil.newHashMap();
- params.put("keyword1", deviceName);
- params.put("keyword2", msg);
- params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
- //商户管理员发
- b.setReceivers(mpIds);
- b.setTemplateParams(params);
- b.setExtraParam(bizExtraParam);
- sendList.add(b);
- }
- }
- }
- //是否可推送条件
- if (CollUtil.isNotEmpty(sendList)) {
- msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
- }
- }
- }
- }
- /**
- * 设备联网状态
- *
- * @param requestParams
- * @param netState
- */
- private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
- JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
- String clientid = jsonObject.getStr("clientid");
- Long deviceId;
- try {
- deviceId = Long.valueOf(clientid);
- } catch (Exception e) {
- return;
- }
- //发布事件
- DeviceEvent.NetWorkEvent netWorkEvent = new DeviceEvent.NetWorkEvent()
- .setNetState(netState);
- netWorkEvent.setValue(deviceId);
- EventUtils.push(() -> netWorkEvent);
- }
- }
|