DeviceMqttConsumerImpl.java 21 KB


  1. package com.xy.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.date.DatePattern;
  5. import cn.hutool.core.date.DateUtil;
  6. import cn.hutool.core.map.MapUtil;
  7. import cn.hutool.core.util.BooleanUtil;
  8. import cn.hutool.core.util.ObjectUtil;
  9. import cn.hutool.core.util.StrUtil;
  10. import cn.hutool.json.JSONObject;
  11. import cn.hutool.json.JSONUtil;
  12. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  13. import com.xy.config.DeviceThreadPoolConfig;
  14. import com.xy.device.EnumDeviceOnlineStatus;
  15. import com.xy.dto.*;
  16. import com.xy.dto.be.MercDto;
  17. import com.xy.dto.be.MercUserDeviceDto;
  18. import com.xy.entity.*;
  19. import com.xy.enums.ChannelType;
  20. import com.xy.enums.MsgConfigId;
  21. import com.xy.enums.MsgType;
  22. import com.xy.service.be.MercFeignService;
  23. import com.xy.utils.MercAuthUtils;
  24. import com.xy.utils.R;
  25. import com.xy.utils.SysCodeConfigureUtils;
  26. import com.xy.utils.SysDictUtils;
  27. import io.swagger.annotations.Api;
  28. import io.swagger.annotations.ApiOperation;
  29. import lombok.AllArgsConstructor;
  30. import lombok.extern.slf4j.Slf4j;
  31. import org.springframework.scheduling.annotation.Async;
  32. import org.springframework.stereotype.Service;
  33. import java.time.LocalDateTime;
  34. import java.util.*;
  35. import java.util.stream.Collectors;
  36. @Slf4j
  37. @Service
  38. @AllArgsConstructor
  39. @Api(tags = "设备消费者")
  40. public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
  41. private DeviceStatusServiceImpl deviceStatusService;
  42. private DeviceNetRecordServiceImpl deviceNetRecordService;
  43. private MsgSendApiService msgSendApiService;
  44. private DeviceInfoServiceImpl deviceInfoService;
  45. private MercFeignService mercFeignService;
  46. private UserInfoService userInfoService;
  47. private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
  48. private DeviceFaultLogServiceImpl deviceFaultLogService;
  49. private DeviceFaultInfoServiceImpl deviceFaultInfoService;
  50. @Override
  51. @ApiOperation("设备在线")
  52. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  53. public void connected(DeviceMqttDto.RequestParams requestParams) {
  54. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
  55. netWork(requestParams, netState);
  56. }
  57. @Override
  58. @ApiOperation("设备离线")
  59. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  60. public void disconnect(DeviceMqttDto.RequestParams requestParams) {
  61. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
  62. netWork(requestParams, netState);
  63. }
  64. @Override
  65. @ApiOperation("设备消息推送")
  66. public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
  67. DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
  68. if (deviceEventMsg != null) {
  69. Long deviceId = deviceEventMsg.getDeviceId();
  70. Long mercId = deviceEventMsg.getMercId();
  71. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  72. String mercCode = merc.getMercCode();
  73. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  74. String msg = deviceEventMsg.getMsg();
  75. String code = deviceEventMsg.getCode();
  76. SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
  77. String expand = sysCodeConfigureRedis.getExpand();
  78. if (StrUtil.isEmpty(expand)) {
  79. return;
  80. }
  81. String paterCode = sysCodeConfigureRedis.getPaterCode();
  82. SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
  83. String eventType = parentCodeConfig.getMsg();
  84. String title = sysCodeConfigureRedis.getTitle();
  85. //是否展示给商户
  86. boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
  87. //是否推送给商户
  88. boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
  89. //是否推送时间段
  90. boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
  91. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  92. if (MercAuthUtils.sendMsgToCM(mercCode)) {
  93. configId = MsgConfigId.CM_DEVICE_EXCEPTION.getId();
  94. }
  95. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  96. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  97. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  98. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  99. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  100. /**
  101. * {{first.DATA}}
  102. * 设备名称:{{keyword1.DATA}}
  103. * 告警内容:{{keyword2.DATA}}
  104. * 发生时间:{{keyword3.DATA}}
  105. * {{remark.DATA}}
  106. *
  107. */
  108. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  109. String deviceName = deviceInfo.getDeviceName();
  110. if (StrUtil.isEmpty(deviceName)) {
  111. deviceName = deviceId + "";
  112. } else {
  113. deviceName = deviceName + "(" + deviceId + ")";
  114. }
  115. Long userInfoId = merc.getUserInfoId();
  116. List<Long> userInfoIdList = new ArrayList<>();
  117. userInfoIdList.add(userInfoId);
  118. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  119. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  120. if (CollUtil.isNotEmpty(mercUserDevices)) {
  121. mercUserDevices.forEach(mud -> {
  122. Long deviceIdUser = mud.getDeviceId();
  123. if (deviceId == deviceIdUser) {
  124. userInfoIdList.add(mud.getUserId());
  125. }
  126. });
  127. }
  128. StringBuilder sbMsg = StrUtil.builder().append("设备")
  129. .append("【")
  130. .append(deviceName)
  131. .append("】")
  132. .append("发生故障:").append(msg).append(",发生时间:")
  133. .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  134. //查询userinfo表的mpOpenId
  135. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  136. //配置了展示给商户才生成数据
  137. if (expShow2Merc) {
  138. userInfoList.forEach(u -> {
  139. String name = u.getName();
  140. //系统消息
  141. MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
  142. msgSys.setUserId(u.getAuthorizeUserId());
  143. msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
  144. .setPubTime(LocalDateTime.now()).setPubState(2)
  145. .setSendTargets(name)
  146. .setPriority(msgConfig.getPriority())
  147. .setTitle(eventType + "-" + title)
  148. .setEventCode(code)
  149. .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
  150. msgSysMonitorNoticeService.save(msgSys);
  151. });
  152. }
  153. for (MsgConfigTestDto.BizData b : bizDataList) {
  154. String channelType = b.getChannelType();
  155. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  156. //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
  157. Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  158. .map(UserInfoDto.Vo::getMpOpenid)
  159. .collect(Collectors.toSet());
  160. if (CollUtil.isEmpty(mpOpenIds)) {
  161. continue;
  162. }
  163. //微信公众号
  164. //扩展参数
  165. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  166. if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
  167. String propose = sysCodeConfigureRedis.getPropose();
  168. if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
  169. bizExtraParam.setUrl(propose);
  170. }
  171. }
  172. Map<String, Object> params = MapUtil.newHashMap();
  173. if (MercAuthUtils.sendMsgToCM(mercCode)) {
  174. //设备编号:{{character_string2.DATA}} 故障类型:{{thing6.DATA}} 报障时间:{{time8.DATA}}
  175. params.put("character_string2", deviceName);
  176. params.put("thing6", msg);
  177. params.put("time8", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  178. } else {
  179. params.put("keyword1", deviceName);
  180. params.put("keyword2", msg);
  181. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  182. }
  183. //商户管理员发
  184. b.setReceivers(mpOpenIds);
  185. b.setTemplateParams(params);
  186. b.setExtraParam(bizExtraParam);
  187. sendList.add(b);
  188. }
  189. }
  190. //是否可推送条件
  191. if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
  192. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  193. }
  194. }
  195. }
  196. }
  197. /**
  198. * 设备故障
  199. *
  200. * @param requestParams
  201. */
  202. @Override
  203. public void deviceFault(DeviceMqttDto.RequestParams requestParams) {
  204. DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class);
  205. // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData));
  206. if (mqttData != null) {
  207. DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg();
  208. Long mercId = deviceEventMsg.getMercId();
  209. String code = deviceEventMsg.getCode();
  210. Long deviceId = deviceEventMsg.getDeviceId();
  211. String msg = deviceEventMsg.getMsg();
  212. //加入故障日志
  213. DeviceFaultLog log = new DeviceFaultLog()
  214. .setMercId(mercId)
  215. .setDescribes(msg)
  216. .setEventCode(code)
  217. .setEventTime(deviceEventMsg.getCreateTime())
  218. .setDeviceId(deviceId);
  219. deviceFaultLogService.save(log);
  220. List<DeviceFaultMonitorConfigDto.Vo> configList = mqttData.getConfigList();
  221. // 按故障级别排序 降序
  222. List<DeviceFaultMonitorConfigDto.Vo> configSortedList = configList.stream()
  223. .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed())
  224. .collect(Collectors.toList());
  225. for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) {
  226. Integer eventNum = config.getEventNum();
  227. Long minuteNum = config.getMinuteNum();
  228. Integer faultLevel = config.getFaultLevel();
  229. Long monitorId = config.getMonitorId();
  230. Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale());
  231. // 获取当前时间
  232. Date now = DateUtil.date();
  233. // 计算最近n分钟的开始时间
  234. Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue());
  235. //统计同类事件累计次数
  236. Long count = deviceFaultLogService.count(Wrappers.<DeviceFaultLog>lambdaQuery()
  237. .eq(DeviceFaultLog::getDeviceId, deviceId)
  238. .eq(DeviceFaultLog::getMercId, mercId)
  239. .eq(DeviceFaultLog::getEventCode, code)
  240. .ge(DeviceFaultLog::getEventTime, minutesAgo)
  241. );
  242. //达到阈值次数
  243. if (count != null && count.intValue() >= eventNum) {
  244. //故障表入库
  245. //是否已存在 未解决的
  246. DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.<DeviceFaultInfo>lambdaQuery()
  247. .eq(DeviceFaultInfo::getDeviceId, deviceId)
  248. .eq(DeviceFaultInfo::getMercId, mercId)
  249. .eq(DeviceFaultInfo::getEventCode, code)
  250. .eq(DeviceFaultInfo::getState, false)
  251. );
  252. if (deviceFaultInfo != null) {
  253. Integer faultLevel1 = deviceFaultInfo.getFaultLevel();
  254. if (faultLevel1.intValue() < faultLevel) {
  255. //小于,当前故障级别,更新
  256. deviceFaultInfo.setFaultLevel(faultLevel);
  257. deviceFaultInfo.setToAfterSale(toAfterSale);
  258. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  259. }
  260. //累加次数
  261. deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId);
  262. deviceFaultInfoService.updateById(deviceFaultInfo);
  263. } else {
  264. deviceFaultInfo = new DeviceFaultInfo()
  265. .setMercId(mercId)
  266. .setDeviceId(deviceId)
  267. .setDescribes(msg)
  268. .setFaultLevel(faultLevel)
  269. .setEventCode(code)
  270. .setEventNum(count.intValue())
  271. .setMonitorId(monitorId)
  272. .setToAfterSale(toAfterSale);
  273. deviceFaultInfoService.save(deviceFaultInfo);
  274. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  275. }
  276. //给售后推送故障信息
  277. if (toAfterSale.booleanValue()) {
  278. deviceFaultPushMsg(deviceEventMsg);
  279. }
  280. //已按最大故障级别排序,所以满足条件就退出循环
  281. break;
  282. }
  283. }
  284. }
  285. }
  286. /**
  287. * 故障推送给所有售后
  288. *
  289. * @param deviceEventMsg
  290. */
  291. public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) {
  292. if (deviceEventMsg != null) {
  293. Long deviceId = deviceEventMsg.getDeviceId();
  294. Long mercId = deviceEventMsg.getMercId();
  295. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  296. String msg = deviceEventMsg.getMsg();
  297. String code = deviceEventMsg.getCode();
  298. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  299. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  300. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  301. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  302. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  303. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  304. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  305. String deviceName = deviceInfo.getDeviceName();
  306. if (StrUtil.isEmpty(deviceName)) {
  307. deviceName = deviceId + "";
  308. } else {
  309. deviceName = deviceName + "(" + deviceId + ")";
  310. }
  311. // 获取售后及售后创建的角色用户
  312. UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers());
  313. List<String> mpOpenIds = saleBackUsersVO.getMpOpenIds();
  314. //开门超时事件增加商户推送 ERR0020
  315. if ("ERR0020".equals(code)) {
  316. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  317. Long userInfoId = merc.getUserInfoId();
  318. List<Long> userInfoIdList = new ArrayList<>();
  319. userInfoIdList.add(userInfoId);
  320. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  321. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  322. if (CollUtil.isNotEmpty(mercUserDevices)) {
  323. mercUserDevices.forEach(mud -> {
  324. Long deviceIdUser = mud.getDeviceId();
  325. if (deviceId == deviceIdUser) {
  326. userInfoIdList.add(mud.getUserId());
  327. }
  328. });
  329. }
  330. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  331. if (CollUtil.isNotEmpty(userInfoList)) {
  332. Set<String> mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  333. .map(UserInfoDto.Vo::getMpOpenid)
  334. .collect(Collectors.toSet());
  335. mpOpenIds.addAll(mercMpOpenIds);
  336. }
  337. }
  338. if (CollUtil.isNotEmpty(mpOpenIds)) {
  339. Set<String> mpIds = new HashSet<>(mpOpenIds);
  340. for (MsgConfigTestDto.BizData b : bizDataList) {
  341. String channelType = b.getChannelType();
  342. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  343. //微信公众号
  344. //扩展参数
  345. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  346. Map<String, Object> params = MapUtil.newHashMap();
  347. params.put("keyword1", deviceName);
  348. params.put("keyword2", msg);
  349. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  350. //商户管理员发
  351. b.setReceivers(mpIds);
  352. b.setTemplateParams(params);
  353. b.setExtraParam(bizExtraParam);
  354. sendList.add(b);
  355. }
  356. }
  357. }
  358. //是否可推送条件
  359. if (CollUtil.isNotEmpty(sendList)) {
  360. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  361. }
  362. }
  363. }
  364. }
  365. /**
  366. * 设备联网状态
  367. *
  368. * @param requestParams
  369. * @param netState
  370. */
  371. private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
  372. JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
  373. String clientid = jsonObject.getStr("clientid");
  374. Long deviceId;
  375. try {
  376. deviceId = Long.valueOf(clientid);
  377. } catch (Exception e) {
  378. return;
  379. }
  380. //上报状态
  381. DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
  382. up.setNetState(netState);
  383. deviceStatusService.up(up).getCode();
  384. //添加联网记录
  385. DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
  386. .setDeviceId(deviceId);
  387. save.setNetStatus(netState);
  388. deviceNetRecordService.save(save);
  389. }
  390. }