package com.xy.service; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.xy.config.DeviceThreadPoolConfig; import com.xy.device.EnumDeviceOnlineStatus; import com.xy.dto.*; import com.xy.dto.be.MercDto; import com.xy.dto.be.MercUserDeviceDto; import com.xy.entity.DeviceEventMsg; import com.xy.entity.DeviceInfo; import com.xy.entity.SysCodeConfigureRedis; import com.xy.enums.ChannelType; import com.xy.enums.MsgConfigId; import com.xy.enums.MsgType; import com.xy.service.be.MercFeignService; import com.xy.utils.R; import com.xy.utils.SysCodeConfigureUtils; import com.xy.utils.SysDictUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.AllArgsConstructor; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @Service @AllArgsConstructor @Api(tags = "设备消费者") public class DeviceMqttConsumerImpl implements DeviceMqttConsumer { private DeviceStatusServiceImpl deviceStatusService; private DeviceNetRecordServiceImpl deviceNetRecordService; private MsgSendApiService msgSendApiService; private DeviceInfoServiceImpl deviceInfoService; private MercFeignService mercFeignService; private UserInfoService userInfoService; private MsgSysMonitorNoticeService msgSysMonitorNoticeService; @Override @ApiOperation("设备在线") @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL) public void connected(DeviceMqttDto.RequestParams requestParams) { Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class); netWork(requestParams, netState); } @Override @ApiOperation("设备离线") @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL) public void disconnect(DeviceMqttDto.RequestParams requestParams) { Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class); netWork(requestParams, netState); } @Override @ApiOperation("设备消息推送") public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) { DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class); if (deviceEventMsg != null) { Long deviceId = deviceEventMsg.getDeviceId(); Long mercId = deviceEventMsg.getMercId(); LocalDateTime createTime = deviceEventMsg.getCreateTime(); String msg = deviceEventMsg.getMsg(); String code = deviceEventMsg.getCode(); SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code); String expand = sysCodeConfigureRedis.getExpand(); if (StrUtil.isEmpty(expand)) { return; } String paterCode = sysCodeConfigureRedis.getPaterCode(); SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode); String eventType = parentCodeConfig.getMsg(); String title = sysCodeConfigureRedis.getTitle(); //是否展示给商户 boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand); //是否推送给商户 boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand); //是否推送时间段 boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand); Long configId = MsgConfigId.DEVICE_EXCEPTION.getId(); MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId))); List bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId))); if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) { List bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class); List sendList = new ArrayList<>(); /** * {{first.DATA}} * 设备名称:{{keyword1.DATA}} * 告警内容:{{keyword2.DATA}} * 发生时间:{{keyword3.DATA}} * {{remark.DATA}} * */ DeviceInfo deviceInfo = deviceInfoService.getById(deviceId); String deviceName = deviceInfo.getDeviceName(); if (StrUtil.isEmpty(deviceName)) { deviceName = deviceId + ""; } else { deviceName = deviceName + "(" + deviceId + ")"; } MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId))); Long userInfoId = merc.getUserInfoId(); List userInfoIdList = new ArrayList<>(); userInfoIdList.add(userInfoId); List mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers( new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId))); if (CollUtil.isNotEmpty(mercUserDevices)) { mercUserDevices.forEach(mud -> { Long deviceIdUser = mud.getDeviceId(); if (deviceId == deviceIdUser) { userInfoIdList.add(mud.getUserId()); } }); } StringBuilder sbMsg = StrUtil.builder().append("设备") .append("【") .append(deviceName) .append("】") .append("发生故障:").append(msg).append(",发生时间:") .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); //查询userinfo表的mpOpenId List userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList))); //配置了展示给商户才生成数据 if (expShow2Merc) { userInfoList.forEach(u -> { String name = u.getName(); //系统消息 MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save(); msgSys.setUserId(u.getAuthorizeUserId()); msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString()) .setPubTime(LocalDateTime.now()).setPubState(2) .setSendTargets(name) .setPriority(msgConfig.getPriority()) .setTitle(eventType + "-" + title) .setEventCode(code) .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription()); msgSysMonitorNoticeService.save(msgSys); }); } for (MsgConfigTestDto.BizData b : bizDataList) { String channelType = b.getChannelType(); if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) { //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID Set mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid())) .map(UserInfoDto.Vo::getMpOpenid) .collect(Collectors.toSet()); if (CollUtil.isEmpty(mpOpenIds)) { continue; } //微信公众号 //扩展参数 MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam(); if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) { String propose = sysCodeConfigureRedis.getPropose(); if (StrUtil.isNotEmpty(propose) && propose.contains("http")) { bizExtraParam.setUrl(propose); } } Map params = MapUtil.newHashMap(); params.put("keyword1", deviceName); params.put("keyword2", msg); params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN)); //商户管理员发 b.setReceivers(mpOpenIds); b.setTemplateParams(params); b.setExtraParam(bizExtraParam); sendList.add(b); } } //是否可推送条件 if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) { msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList)); } } } } /** * 设备联网状态 * * @param requestParams * @param netState */ private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) { JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData()); String clientid = jsonObject.getStr("clientid"); Long deviceId; try { deviceId = Long.valueOf(clientid); } catch (Exception e) { return; } //上报状态 DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId); up.setNetState(netState); deviceStatusService.up(up).getCode(); //添加联网记录 DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save() .setDeviceId(deviceId); save.setNetStatus(netState); deviceNetRecordService.save(save); } }