DeviceMqttConsumerImpl.java 21 KB


  1. package com.xy.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.date.DatePattern;
  5. import cn.hutool.core.date.DateUtil;
  6. import cn.hutool.core.map.MapUtil;
  7. import cn.hutool.core.util.BooleanUtil;
  8. import cn.hutool.core.util.ObjectUtil;
  9. import cn.hutool.core.util.StrUtil;
  10. import cn.hutool.json.JSONObject;
  11. import cn.hutool.json.JSONUtil;
  12. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  13. import com.xy.config.DeviceThreadPoolConfig;
  14. import com.xy.device.EnumDeviceOnlineStatus;
  15. import com.xy.dto.*;
  16. import com.xy.dto.be.MercDto;
  17. import com.xy.dto.be.MercUserDeviceDto;
  18. import com.xy.entity.*;
  19. import com.xy.enums.ChannelType;
  20. import com.xy.enums.MsgConfigId;
  21. import com.xy.enums.MsgType;
  22. import com.xy.event.DeviceEvent;
  23. import com.xy.service.be.MercFeignService;
  24. import com.xy.utils.*;
  25. import io.swagger.annotations.Api;
  26. import io.swagger.annotations.ApiOperation;
  27. import lombok.AllArgsConstructor;
  28. import lombok.extern.slf4j.Slf4j;
  29. import org.springframework.scheduling.annotation.Async;
  30. import org.springframework.stereotype.Service;
  31. import java.time.LocalDateTime;
  32. import java.util.*;
  33. import java.util.stream.Collectors;
  34. @Slf4j
  35. @Service
  36. @AllArgsConstructor
  37. @Api(tags = "设备消费者")
  38. public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
  39. private MsgSendApiService msgSendApiService;
  40. private DeviceInfoServiceImpl deviceInfoService;
  41. private MercFeignService mercFeignService;
  42. private UserInfoService userInfoService;
  43. private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
  44. private DeviceFaultLogServiceImpl deviceFaultLogService;
  45. private DeviceFaultInfoServiceImpl deviceFaultInfoService;
  46. @Override
  47. @ApiOperation("设备在线")
  48. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  49. public void connected(DeviceMqttDto.RequestParams requestParams) {
  50. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
  51. netWork(requestParams, netState);
  52. }
  53. @Override
  54. @ApiOperation("设备离线")
  55. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  56. public void disconnect(DeviceMqttDto.RequestParams requestParams) {
  57. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
  58. netWork(requestParams, netState);
  59. }
  60. @Override
  61. @ApiOperation("设备消息推送")
  62. public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
  63. DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
  64. if (deviceEventMsg != null) {
  65. Long deviceId = deviceEventMsg.getDeviceId();
  66. Long mercId = deviceEventMsg.getMercId();
  67. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  68. String mercCode = merc.getMercCode();
  69. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  70. String msg = deviceEventMsg.getMsg();
  71. String code = deviceEventMsg.getCode();
  72. SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
  73. String expand = sysCodeConfigureRedis.getExpand();
  74. if (StrUtil.isEmpty(expand)) {
  75. return;
  76. }
  77. String paterCode = sysCodeConfigureRedis.getPaterCode();
  78. SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
  79. String eventType = parentCodeConfig.getMsg();
  80. String title = sysCodeConfigureRedis.getTitle();
  81. //是否展示给商户
  82. boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
  83. //是否推送给商户
  84. boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
  85. //是否推送时间段
  86. boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
  87. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  88. if (MercAuthUtils.sendMsgToCM(mercCode)) {
  89. configId = MsgConfigId.CM_DEVICE_EXCEPTION.getId();
  90. }
  91. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  92. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  93. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  94. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  95. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  96. /**
  97. * {{first.DATA}}
  98. * 设备名称:{{keyword1.DATA}}
  99. * 告警内容:{{keyword2.DATA}}
  100. * 发生时间:{{keyword3.DATA}}
  101. * {{remark.DATA}}
  102. *
  103. */
  104. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  105. String deviceName = deviceInfo.getDeviceName();
  106. if (StrUtil.isEmpty(deviceName)) {
  107. deviceName = deviceId + "";
  108. } else {
  109. deviceName = deviceName + "(" + deviceId + ")";
  110. }
  111. Long userInfoId = merc.getUserInfoId();
  112. List<Long> userInfoIdList = new ArrayList<>();
  113. userInfoIdList.add(userInfoId);
  114. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  115. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  116. if (CollUtil.isNotEmpty(mercUserDevices)) {
  117. mercUserDevices.forEach(mud -> {
  118. Long deviceIdUser = mud.getDeviceId();
  119. if (deviceId == deviceIdUser) {
  120. userInfoIdList.add(mud.getUserId());
  121. }
  122. });
  123. }
  124. StringBuilder sbMsg = StrUtil.builder().append("设备")
  125. .append("【")
  126. .append(deviceName)
  127. .append("】")
  128. .append("发生故障:").append(msg).append(",发生时间:")
  129. .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  130. //查询userinfo表的mpOpenId
  131. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  132. //配置了展示给商户才生成数据
  133. if (expShow2Merc) {
  134. userInfoList.forEach(u -> {
  135. String name = u.getName();
  136. //系统消息
  137. MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
  138. msgSys.setUserId(u.getAuthorizeUserId());
  139. msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
  140. .setPubTime(LocalDateTime.now()).setPubState(2)
  141. .setSendTargets(name)
  142. .setPriority(msgConfig.getPriority())
  143. .setTitle(eventType + "-" + title)
  144. .setEventCode(code)
  145. .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
  146. msgSysMonitorNoticeService.save(msgSys);
  147. });
  148. }
  149. for (MsgConfigTestDto.BizData b : bizDataList) {
  150. String channelType = b.getChannelType();
  151. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  152. //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
  153. Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  154. .map(UserInfoDto.Vo::getMpOpenid)
  155. .collect(Collectors.toSet());
  156. if (CollUtil.isEmpty(mpOpenIds)) {
  157. continue;
  158. }
  159. //微信公众号
  160. //扩展参数
  161. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  162. if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
  163. String propose = sysCodeConfigureRedis.getPropose();
  164. if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
  165. bizExtraParam.setUrl(propose);
  166. }
  167. }
  168. Map<String, Object> params = MapUtil.newHashMap();
  169. if (MercAuthUtils.sendMsgToCM(mercCode)) {
  170. //设备编号:{{character_string2.DATA}} 故障类型:{{thing6.DATA}} 报障时间:{{time8.DATA}}
  171. params.put("character_string2", deviceId);
  172. params.put("thing6", msg);
  173. params.put("time8", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  174. } else {
  175. params.put("keyword1", deviceName);
  176. params.put("keyword2", msg);
  177. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  178. }
  179. //商户管理员发
  180. b.setReceivers(mpOpenIds);
  181. b.setTemplateParams(params);
  182. b.setExtraParam(bizExtraParam);
  183. sendList.add(b);
  184. }
  185. }
  186. //是否可推送条件
  187. if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
  188. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  189. }
  190. }
  191. }
  192. }
  193. /**
  194. * 设备故障
  195. *
  196. * @param requestParams
  197. */
  198. @Override
  199. public void deviceFault(DeviceMqttDto.RequestParams requestParams) {
  200. DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class);
  201. // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData));
  202. if (mqttData != null) {
  203. DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg();
  204. Long mercId = deviceEventMsg.getMercId();
  205. String code = deviceEventMsg.getCode();
  206. Long deviceId = deviceEventMsg.getDeviceId();
  207. String msg = deviceEventMsg.getMsg();
  208. //加入故障日志
  209. DeviceFaultLog log = new DeviceFaultLog()
  210. .setMercId(mercId)
  211. .setDescribes(msg)
  212. .setEventCode(code)
  213. .setEventTime(deviceEventMsg.getCreateTime())
  214. .setDeviceId(deviceId);
  215. deviceFaultLogService.save(log);
  216. List<DeviceFaultMonitorConfigDto.Vo> configList = mqttData.getConfigList();
  217. // 按故障级别排序 降序
  218. List<DeviceFaultMonitorConfigDto.Vo> configSortedList = configList.stream()
  219. .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed())
  220. .collect(Collectors.toList());
  221. for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) {
  222. Integer eventNum = config.getEventNum();
  223. Long minuteNum = config.getMinuteNum();
  224. Integer faultLevel = config.getFaultLevel();
  225. Long monitorId = config.getMonitorId();
  226. Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale());
  227. // 获取当前时间
  228. Date now = DateUtil.date();
  229. // 计算最近n分钟的开始时间
  230. Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue());
  231. //统计同类事件累计次数
  232. Long count = deviceFaultLogService.count(Wrappers.<DeviceFaultLog>lambdaQuery()
  233. .eq(DeviceFaultLog::getDeviceId, deviceId)
  234. .eq(DeviceFaultLog::getMercId, mercId)
  235. .eq(DeviceFaultLog::getEventCode, code)
  236. .ge(DeviceFaultLog::getEventTime, minutesAgo)
  237. );
  238. //达到阈值次数
  239. if (count != null && count.intValue() >= eventNum) {
  240. //故障表入库
  241. //是否已存在 未解决的
  242. DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.<DeviceFaultInfo>lambdaQuery()
  243. .eq(DeviceFaultInfo::getDeviceId, deviceId)
  244. .eq(DeviceFaultInfo::getMercId, mercId)
  245. .eq(DeviceFaultInfo::getEventCode, code)
  246. .eq(DeviceFaultInfo::getState, false)
  247. );
  248. if (deviceFaultInfo != null) {
  249. Integer faultLevel1 = deviceFaultInfo.getFaultLevel();
  250. if (faultLevel1.intValue() < faultLevel) {
  251. //小于,当前故障级别,更新
  252. deviceFaultInfo.setFaultLevel(faultLevel);
  253. deviceFaultInfo.setToAfterSale(toAfterSale);
  254. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  255. }
  256. //累加次数
  257. deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId);
  258. deviceFaultInfoService.updateById(deviceFaultInfo);
  259. } else {
  260. deviceFaultInfo = new DeviceFaultInfo()
  261. .setMercId(mercId)
  262. .setDeviceId(deviceId)
  263. .setDescribes(msg)
  264. .setFaultLevel(faultLevel)
  265. .setEventCode(code)
  266. .setEventNum(count.intValue())
  267. .setMonitorId(monitorId)
  268. .setToAfterSale(toAfterSale);
  269. deviceFaultInfoService.save(deviceFaultInfo);
  270. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  271. }
  272. //给售后推送故障信息
  273. if (toAfterSale.booleanValue()) {
  274. deviceFaultPushMsg(deviceEventMsg);
  275. }
  276. //已按最大故障级别排序,所以满足条件就退出循环
  277. break;
  278. }
  279. }
  280. }
  281. }
  282. /**
  283. * 故障推送给所有售后
  284. *
  285. * @param deviceEventMsg
  286. */
  287. public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) {
  288. if (deviceEventMsg != null) {
  289. Long deviceId = deviceEventMsg.getDeviceId();
  290. Long mercId = deviceEventMsg.getMercId();
  291. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  292. String msg = deviceEventMsg.getMsg();
  293. String code = deviceEventMsg.getCode();
  294. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  295. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  296. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  297. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  298. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  299. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  300. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  301. String deviceName = deviceInfo.getDeviceName();
  302. if (StrUtil.isEmpty(deviceName)) {
  303. deviceName = deviceId + "";
  304. } else {
  305. deviceName = deviceName + "(" + deviceId + ")";
  306. }
  307. // 获取售后及售后创建的角色用户
  308. UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers());
  309. List<String> mpOpenIds = saleBackUsersVO.getMpOpenIds();
  310. //开门超时事件增加商户推送 ERR0020
  311. if ("ERR0020".equals(code)) {
  312. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  313. Long userInfoId = merc.getUserInfoId();
  314. List<Long> userInfoIdList = new ArrayList<>();
  315. userInfoIdList.add(userInfoId);
  316. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  317. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  318. if (CollUtil.isNotEmpty(mercUserDevices)) {
  319. mercUserDevices.forEach(mud -> {
  320. Long deviceIdUser = mud.getDeviceId();
  321. if (deviceId == deviceIdUser) {
  322. userInfoIdList.add(mud.getUserId());
  323. }
  324. });
  325. }
  326. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  327. if (CollUtil.isNotEmpty(userInfoList)) {
  328. Set<String> mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  329. .map(UserInfoDto.Vo::getMpOpenid)
  330. .collect(Collectors.toSet());
  331. mpOpenIds.addAll(mercMpOpenIds);
  332. }
  333. }
  334. if (CollUtil.isNotEmpty(mpOpenIds)) {
  335. Set<String> mpIds = new HashSet<>(mpOpenIds);
  336. for (MsgConfigTestDto.BizData b : bizDataList) {
  337. String channelType = b.getChannelType();
  338. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  339. //微信公众号
  340. //扩展参数
  341. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  342. Map<String, Object> params = MapUtil.newHashMap();
  343. params.put("keyword1", deviceName);
  344. params.put("keyword2", msg);
  345. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  346. //商户管理员发
  347. b.setReceivers(mpIds);
  348. b.setTemplateParams(params);
  349. b.setExtraParam(bizExtraParam);
  350. sendList.add(b);
  351. }
  352. }
  353. }
  354. //是否可推送条件
  355. if (CollUtil.isNotEmpty(sendList)) {
  356. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  357. }
  358. }
  359. }
  360. }
  361. /**
  362. * 设备联网状态
  363. *
  364. * @param requestParams
  365. * @param netState
  366. */
  367. private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
  368. JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
  369. String clientid = jsonObject.getStr("clientid");
  370. Long deviceId;
  371. try {
  372. deviceId = Long.valueOf(clientid);
  373. } catch (Exception e) {
  374. return;
  375. }
  376. //发布事件
  377. DeviceEvent.NetWorkEvent netWorkEvent = new DeviceEvent.NetWorkEvent()
  378. .setNetState(netState);
  379. netWorkEvent.setValue(deviceId);
  380. EventUtils.push(() -> netWorkEvent);
  381. }
  382. }