DeviceMqttConsumerImpl.java 15 KB


  1. package com.xy.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.date.DatePattern;
  5. import cn.hutool.core.date.DateUtil;
  6. import cn.hutool.core.map.MapUtil;
  7. import cn.hutool.core.util.ObjectUtil;
  8. import cn.hutool.core.util.StrUtil;
  9. import cn.hutool.json.JSONObject;
  10. import cn.hutool.json.JSONUtil;
  11. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  12. import com.xy.config.DeviceThreadPoolConfig;
  13. import com.xy.device.EnumDeviceOnlineStatus;
  14. import com.xy.dto.*;
  15. import com.xy.dto.be.MercDto;
  16. import com.xy.dto.be.MercUserDeviceDto;
  17. import com.xy.entity.*;
  18. import com.xy.enums.ChannelType;
  19. import com.xy.enums.MsgConfigId;
  20. import com.xy.enums.MsgType;
  21. import com.xy.service.be.MercFeignService;
  22. import com.xy.utils.R;
  23. import com.xy.utils.SysCodeConfigureUtils;
  24. import com.xy.utils.SysDictUtils;
  25. import io.swagger.annotations.Api;
  26. import io.swagger.annotations.ApiOperation;
  27. import lombok.AllArgsConstructor;
  28. import lombok.extern.slf4j.Slf4j;
  29. import org.springframework.scheduling.annotation.Async;
  30. import org.springframework.stereotype.Service;
  31. import java.time.LocalDateTime;
  32. import java.util.*;
  33. import java.util.stream.Collectors;
  34. @Slf4j
  35. @Service
  36. @AllArgsConstructor
  37. @Api(tags = "设备消费者")
  38. public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
  39. private DeviceStatusServiceImpl deviceStatusService;
  40. private DeviceNetRecordServiceImpl deviceNetRecordService;
  41. private MsgSendApiService msgSendApiService;
  42. private DeviceInfoServiceImpl deviceInfoService;
  43. private MercFeignService mercFeignService;
  44. private UserInfoService userInfoService;
  45. private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
  46. private DeviceFaultLogServiceImpl deviceFaultLogService;
  47. private DeviceFaultInfoServiceImpl deviceFaultInfoService;
  48. @Override
  49. @ApiOperation("设备在线")
  50. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  51. public void connected(DeviceMqttDto.RequestParams requestParams) {
  52. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
  53. netWork(requestParams, netState);
  54. }
  55. @Override
  56. @ApiOperation("设备离线")
  57. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  58. public void disconnect(DeviceMqttDto.RequestParams requestParams) {
  59. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
  60. netWork(requestParams, netState);
  61. }
  62. @Override
  63. @ApiOperation("设备消息推送")
  64. public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
  65. DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
  66. if (deviceEventMsg != null) {
  67. Long deviceId = deviceEventMsg.getDeviceId();
  68. Long mercId = deviceEventMsg.getMercId();
  69. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  70. String msg = deviceEventMsg.getMsg();
  71. String code = deviceEventMsg.getCode();
  72. SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
  73. String expand = sysCodeConfigureRedis.getExpand();
  74. if (StrUtil.isEmpty(expand)) {
  75. return;
  76. }
  77. String paterCode = sysCodeConfigureRedis.getPaterCode();
  78. SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
  79. String eventType = parentCodeConfig.getMsg();
  80. String title = sysCodeConfigureRedis.getTitle();
  81. //是否展示给商户
  82. boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
  83. //是否推送给商户
  84. boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
  85. //是否推送时间段
  86. boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
  87. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  88. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  89. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  90. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  91. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  92. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  93. /**
  94. * {{first.DATA}}
  95. * 设备名称:{{keyword1.DATA}}
  96. * 告警内容:{{keyword2.DATA}}
  97. * 发生时间:{{keyword3.DATA}}
  98. * {{remark.DATA}}
  99. *
  100. */
  101. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  102. String deviceName = deviceInfo.getDeviceName();
  103. if (StrUtil.isEmpty(deviceName)) {
  104. deviceName = deviceId + "";
  105. } else {
  106. deviceName = deviceName + "(" + deviceId + ")";
  107. }
  108. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  109. Long userInfoId = merc.getUserInfoId();
  110. List<Long> userInfoIdList = new ArrayList<>();
  111. userInfoIdList.add(userInfoId);
  112. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  113. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  114. if (CollUtil.isNotEmpty(mercUserDevices)) {
  115. mercUserDevices.forEach(mud -> {
  116. Long deviceIdUser = mud.getDeviceId();
  117. if (deviceId == deviceIdUser) {
  118. userInfoIdList.add(mud.getUserId());
  119. }
  120. });
  121. }
  122. StringBuilder sbMsg = StrUtil.builder().append("设备")
  123. .append("【")
  124. .append(deviceName)
  125. .append("】")
  126. .append("发生故障:").append(msg).append(",发生时间:")
  127. .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  128. //查询userinfo表的mpOpenId
  129. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  130. //配置了展示给商户才生成数据
  131. if (expShow2Merc) {
  132. userInfoList.forEach(u -> {
  133. String name = u.getName();
  134. //系统消息
  135. MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
  136. msgSys.setUserId(u.getAuthorizeUserId());
  137. msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
  138. .setPubTime(LocalDateTime.now()).setPubState(2)
  139. .setSendTargets(name)
  140. .setPriority(msgConfig.getPriority())
  141. .setTitle(eventType + "-" + title)
  142. .setEventCode(code)
  143. .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
  144. msgSysMonitorNoticeService.save(msgSys);
  145. });
  146. }
  147. for (MsgConfigTestDto.BizData b : bizDataList) {
  148. String channelType = b.getChannelType();
  149. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  150. //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
  151. Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  152. .map(UserInfoDto.Vo::getMpOpenid)
  153. .collect(Collectors.toSet());
  154. if (CollUtil.isEmpty(mpOpenIds)) {
  155. continue;
  156. }
  157. //微信公众号
  158. //扩展参数
  159. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  160. if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
  161. String propose = sysCodeConfigureRedis.getPropose();
  162. if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
  163. bizExtraParam.setUrl(propose);
  164. }
  165. }
  166. Map<String, Object> params = MapUtil.newHashMap();
  167. params.put("keyword1", deviceName);
  168. params.put("keyword2", msg);
  169. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  170. //商户管理员发
  171. b.setReceivers(mpOpenIds);
  172. b.setTemplateParams(params);
  173. b.setExtraParam(bizExtraParam);
  174. sendList.add(b);
  175. }
  176. }
  177. //是否可推送条件
  178. if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
  179. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  180. }
  181. }
  182. }
  183. }
  184. /**
  185. * 设备故障
  186. *
  187. * @param requestParams
  188. */
  189. @Override
  190. public void deviceFault(DeviceMqttDto.RequestParams requestParams) {
  191. DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class);
  192. // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData));
  193. if (mqttData != null) {
  194. DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg();
  195. Long mercId = deviceEventMsg.getMercId();
  196. String code = deviceEventMsg.getCode();
  197. Long deviceId = deviceEventMsg.getDeviceId();
  198. String msg = deviceEventMsg.getMsg();
  199. //加入故障日志
  200. DeviceFaultLog log = new DeviceFaultLog()
  201. .setMercId(mercId)
  202. .setDescribes(msg)
  203. .setEventCode(code)
  204. .setEventTime(deviceEventMsg.getCreateTime())
  205. .setDeviceId(deviceId);
  206. deviceFaultLogService.save(log);
  207. List<DeviceFaultMonitorConfigDto.Vo> configList = mqttData.getConfigList();
  208. // 按故障级别排序 降序
  209. List<DeviceFaultMonitorConfigDto.Vo> configSortedList = configList.stream()
  210. .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed())
  211. .collect(Collectors.toList());
  212. for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) {
  213. Integer eventNum = config.getEventNum();
  214. Long minuteNum = config.getMinuteNum();
  215. Integer faultLevel = config.getFaultLevel();
  216. Long monitorId = config.getMonitorId();
  217. Boolean toAfterSale = config.getToAfterSale();
  218. // 获取当前时间
  219. Date now = DateUtil.date();
  220. // 计算最近n分钟的开始时间
  221. Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue());
  222. //统计同类事件累计次数
  223. Long count = deviceFaultLogService.count(Wrappers.<DeviceFaultLog>lambdaQuery()
  224. .eq(DeviceFaultLog::getDeviceId, deviceId)
  225. .eq(DeviceFaultLog::getMercId, mercId)
  226. .eq(DeviceFaultLog::getEventCode, code)
  227. .ge(DeviceFaultLog::getEventTime, minutesAgo)
  228. );
  229. //达到阈值次数
  230. if (count != null && count.intValue() >= eventNum) {
  231. //故障表入库
  232. //是否已存在 未解决的
  233. DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.<DeviceFaultInfo>lambdaQuery()
  234. .eq(DeviceFaultInfo::getDeviceId, deviceId)
  235. .eq(DeviceFaultInfo::getMercId, mercId)
  236. .eq(DeviceFaultInfo::getEventCode, code)
  237. .eq(DeviceFaultInfo::getState, false)
  238. );
  239. if (deviceFaultInfo != null) {
  240. Integer faultLevel1 = deviceFaultInfo.getFaultLevel();
  241. if (faultLevel1.intValue() < faultLevel) {
  242. //小于,当前故障级别,更新
  243. deviceFaultInfo.setFaultLevel(faultLevel);
  244. deviceFaultInfo.setToAfterSale(toAfterSale);
  245. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  246. }
  247. //累加次数
  248. deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId);
  249. deviceFaultInfoService.updateById(deviceFaultInfo);
  250. } else {
  251. deviceFaultInfoService.save(new DeviceFaultInfo()
  252. .setMercId(mercId)
  253. .setDeviceId(deviceId)
  254. .setDescribes(msg)
  255. .setFaultLevel(faultLevel)
  256. .setEventCode(code)
  257. .setEventNum(count.intValue())
  258. .setMonitorId(monitorId)
  259. .setToAfterSale(toAfterSale)
  260. );
  261. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  262. }
  263. //已按最大故障级别排序,所以满足条件就退出循环
  264. break;
  265. }
  266. }
  267. }
  268. }
  269. /**
  270. * 设备联网状态
  271. *
  272. * @param requestParams
  273. * @param netState
  274. */
  275. private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
  276. JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
  277. String clientid = jsonObject.getStr("clientid");
  278. Long deviceId;
  279. try {
  280. deviceId = Long.valueOf(clientid);
  281. } catch (Exception e) {
  282. return;
  283. }
  284. //上报状态
  285. DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
  286. up.setNetState(netState);
  287. deviceStatusService.up(up).getCode();
  288. //添加联网记录
  289. DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
  290. .setDeviceId(deviceId);
  291. save.setNetStatus(netState);
  292. deviceNetRecordService.save(save);
  293. }
  294. }