DeviceMqttConsumerImpl.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. package com.xy.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.date.DatePattern;
  5. import cn.hutool.core.date.DateUtil;
  6. import cn.hutool.core.map.MapUtil;
  7. import cn.hutool.core.util.BooleanUtil;
  8. import cn.hutool.core.util.ObjectUtil;
  9. import cn.hutool.core.util.StrUtil;
  10. import cn.hutool.json.JSONObject;
  11. import cn.hutool.json.JSONUtil;
  12. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  13. import com.xy.config.DeviceThreadPoolConfig;
  14. import com.xy.device.EnumDeviceOnlineStatus;
  15. import com.xy.dto.*;
  16. import com.xy.dto.be.MercDto;
  17. import com.xy.dto.be.MercUserDeviceDto;
  18. import com.xy.entity.*;
  19. import com.xy.enums.ChannelType;
  20. import com.xy.enums.MsgConfigId;
  21. import com.xy.enums.MsgType;
  22. import com.xy.service.be.MercFeignService;
  23. import com.xy.utils.R;
  24. import com.xy.utils.SysCodeConfigureUtils;
  25. import com.xy.utils.SysDictUtils;
  26. import io.swagger.annotations.Api;
  27. import io.swagger.annotations.ApiOperation;
  28. import lombok.AllArgsConstructor;
  29. import lombok.extern.slf4j.Slf4j;
  30. import org.springframework.scheduling.annotation.Async;
  31. import org.springframework.stereotype.Service;
  32. import java.time.LocalDateTime;
  33. import java.util.*;
  34. import java.util.stream.Collectors;
  35. @Slf4j
  36. @Service
  37. @AllArgsConstructor
  38. @Api(tags = "设备消费者")
  39. public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
  40. private DeviceStatusServiceImpl deviceStatusService;
  41. private DeviceNetRecordServiceImpl deviceNetRecordService;
  42. private MsgSendApiService msgSendApiService;
  43. private DeviceInfoServiceImpl deviceInfoService;
  44. private MercFeignService mercFeignService;
  45. private UserInfoService userInfoService;
  46. private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
  47. private DeviceFaultLogServiceImpl deviceFaultLogService;
  48. private DeviceFaultInfoServiceImpl deviceFaultInfoService;
  49. @Override
  50. @ApiOperation("设备在线")
  51. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  52. public void connected(DeviceMqttDto.RequestParams requestParams) {
  53. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
  54. netWork(requestParams, netState);
  55. }
  56. @Override
  57. @ApiOperation("设备离线")
  58. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  59. public void disconnect(DeviceMqttDto.RequestParams requestParams) {
  60. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
  61. netWork(requestParams, netState);
  62. }
  63. @Override
  64. @ApiOperation("设备消息推送")
  65. public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
  66. DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
  67. if (deviceEventMsg != null) {
  68. Long deviceId = deviceEventMsg.getDeviceId();
  69. Long mercId = deviceEventMsg.getMercId();
  70. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  71. String msg = deviceEventMsg.getMsg();
  72. String code = deviceEventMsg.getCode();
  73. SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
  74. String expand = sysCodeConfigureRedis.getExpand();
  75. if (StrUtil.isEmpty(expand)) {
  76. return;
  77. }
  78. String paterCode = sysCodeConfigureRedis.getPaterCode();
  79. SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
  80. String eventType = parentCodeConfig.getMsg();
  81. String title = sysCodeConfigureRedis.getTitle();
  82. //是否展示给商户
  83. boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
  84. //是否推送给商户
  85. boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
  86. //是否推送时间段
  87. boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
  88. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  89. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  90. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  91. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  92. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  93. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  94. /**
  95. * {{first.DATA}}
  96. * 设备名称:{{keyword1.DATA}}
  97. * 告警内容:{{keyword2.DATA}}
  98. * 发生时间:{{keyword3.DATA}}
  99. * {{remark.DATA}}
  100. *
  101. */
  102. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  103. String deviceName = deviceInfo.getDeviceName();
  104. if (StrUtil.isEmpty(deviceName)) {
  105. deviceName = deviceId + "";
  106. } else {
  107. deviceName = deviceName + "(" + deviceId + ")";
  108. }
  109. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  110. Long userInfoId = merc.getUserInfoId();
  111. List<Long> userInfoIdList = new ArrayList<>();
  112. userInfoIdList.add(userInfoId);
  113. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  114. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  115. if (CollUtil.isNotEmpty(mercUserDevices)) {
  116. mercUserDevices.forEach(mud -> {
  117. Long deviceIdUser = mud.getDeviceId();
  118. if (deviceId == deviceIdUser) {
  119. userInfoIdList.add(mud.getUserId());
  120. }
  121. });
  122. }
  123. StringBuilder sbMsg = StrUtil.builder().append("设备")
  124. .append("【")
  125. .append(deviceName)
  126. .append("】")
  127. .append("发生故障:").append(msg).append(",发生时间:")
  128. .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  129. //查询userinfo表的mpOpenId
  130. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  131. //配置了展示给商户才生成数据
  132. if (expShow2Merc) {
  133. userInfoList.forEach(u -> {
  134. String name = u.getName();
  135. //系统消息
  136. MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
  137. msgSys.setUserId(u.getAuthorizeUserId());
  138. msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
  139. .setPubTime(LocalDateTime.now()).setPubState(2)
  140. .setSendTargets(name)
  141. .setPriority(msgConfig.getPriority())
  142. .setTitle(eventType + "-" + title)
  143. .setEventCode(code)
  144. .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
  145. msgSysMonitorNoticeService.save(msgSys);
  146. });
  147. }
  148. for (MsgConfigTestDto.BizData b : bizDataList) {
  149. String channelType = b.getChannelType();
  150. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  151. //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
  152. Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  153. .map(UserInfoDto.Vo::getMpOpenid)
  154. .collect(Collectors.toSet());
  155. if (CollUtil.isEmpty(mpOpenIds)) {
  156. continue;
  157. }
  158. //微信公众号
  159. //扩展参数
  160. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  161. if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
  162. String propose = sysCodeConfigureRedis.getPropose();
  163. if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
  164. bizExtraParam.setUrl(propose);
  165. }
  166. }
  167. Map<String, Object> params = MapUtil.newHashMap();
  168. params.put("keyword1", deviceName);
  169. params.put("keyword2", msg);
  170. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  171. //商户管理员发
  172. b.setReceivers(mpOpenIds);
  173. b.setTemplateParams(params);
  174. b.setExtraParam(bizExtraParam);
  175. sendList.add(b);
  176. }
  177. }
  178. //是否可推送条件
  179. if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
  180. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  181. }
  182. }
  183. }
  184. }
  185. /**
  186. * 设备故障
  187. *
  188. * @param requestParams
  189. */
  190. @Override
  191. public void deviceFault(DeviceMqttDto.RequestParams requestParams) {
  192. DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class);
  193. // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData));
  194. if (mqttData != null) {
  195. DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg();
  196. Long mercId = deviceEventMsg.getMercId();
  197. String code = deviceEventMsg.getCode();
  198. Long deviceId = deviceEventMsg.getDeviceId();
  199. String msg = deviceEventMsg.getMsg();
  200. //加入故障日志
  201. DeviceFaultLog log = new DeviceFaultLog()
  202. .setMercId(mercId)
  203. .setDescribes(msg)
  204. .setEventCode(code)
  205. .setEventTime(deviceEventMsg.getCreateTime())
  206. .setDeviceId(deviceId);
  207. deviceFaultLogService.save(log);
  208. List<DeviceFaultMonitorConfigDto.Vo> configList = mqttData.getConfigList();
  209. // 按故障级别排序 降序
  210. List<DeviceFaultMonitorConfigDto.Vo> configSortedList = configList.stream()
  211. .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed())
  212. .collect(Collectors.toList());
  213. for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) {
  214. Integer eventNum = config.getEventNum();
  215. Long minuteNum = config.getMinuteNum();
  216. Integer faultLevel = config.getFaultLevel();
  217. Long monitorId = config.getMonitorId();
  218. Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale());
  219. // 获取当前时间
  220. Date now = DateUtil.date();
  221. // 计算最近n分钟的开始时间
  222. Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue());
  223. //统计同类事件累计次数
  224. Long count = deviceFaultLogService.count(Wrappers.<DeviceFaultLog>lambdaQuery()
  225. .eq(DeviceFaultLog::getDeviceId, deviceId)
  226. .eq(DeviceFaultLog::getMercId, mercId)
  227. .eq(DeviceFaultLog::getEventCode, code)
  228. .ge(DeviceFaultLog::getEventTime, minutesAgo)
  229. );
  230. //达到阈值次数
  231. if (count != null && count.intValue() >= eventNum) {
  232. //故障表入库
  233. //是否已存在 未解决的
  234. DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.<DeviceFaultInfo>lambdaQuery()
  235. .eq(DeviceFaultInfo::getDeviceId, deviceId)
  236. .eq(DeviceFaultInfo::getMercId, mercId)
  237. .eq(DeviceFaultInfo::getEventCode, code)
  238. .eq(DeviceFaultInfo::getState, false)
  239. );
  240. if (deviceFaultInfo != null) {
  241. Integer faultLevel1 = deviceFaultInfo.getFaultLevel();
  242. if (faultLevel1.intValue() < faultLevel) {
  243. //小于,当前故障级别,更新
  244. deviceFaultInfo.setFaultLevel(faultLevel);
  245. deviceFaultInfo.setToAfterSale(toAfterSale);
  246. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  247. }
  248. //累加次数
  249. deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId);
  250. deviceFaultInfoService.updateById(deviceFaultInfo);
  251. } else {
  252. deviceFaultInfo = new DeviceFaultInfo()
  253. .setMercId(mercId)
  254. .setDeviceId(deviceId)
  255. .setDescribes(msg)
  256. .setFaultLevel(faultLevel)
  257. .setEventCode(code)
  258. .setEventNum(count.intValue())
  259. .setMonitorId(monitorId)
  260. .setToAfterSale(toAfterSale);
  261. deviceFaultInfoService.save(deviceFaultInfo);
  262. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  263. }
  264. //给售后推送故障信息
  265. if (toAfterSale.booleanValue()) {
  266. deviceFaultPushMsg(deviceEventMsg);
  267. }
  268. //已按最大故障级别排序,所以满足条件就退出循环
  269. break;
  270. }
  271. }
  272. }
  273. }
  274. /**
  275. * 故障推送给所有售后
  276. *
  277. * @param deviceEventMsg
  278. */
  279. public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) {
  280. if (deviceEventMsg != null) {
  281. Long deviceId = deviceEventMsg.getDeviceId();
  282. Long mercId = deviceEventMsg.getMercId();
  283. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  284. String msg = deviceEventMsg.getMsg();
  285. String code = deviceEventMsg.getCode();
  286. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  287. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  288. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  289. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  290. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  291. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  292. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  293. String deviceName = deviceInfo.getDeviceName();
  294. if (StrUtil.isEmpty(deviceName)) {
  295. deviceName = deviceId + "";
  296. } else {
  297. deviceName = deviceName + "(" + deviceId + ")";
  298. }
  299. // 获取售后及售后创建的角色用户
  300. UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers());
  301. List<String> mpOpenIds = saleBackUsersVO.getMpOpenIds();
  302. //开门超时事件增加商户推送 ERR0020
  303. if ("ERR0020".equals(code)) {
  304. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  305. Long userInfoId = merc.getUserInfoId();
  306. List<Long> userInfoIdList = new ArrayList<>();
  307. userInfoIdList.add(userInfoId);
  308. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  309. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  310. if (CollUtil.isNotEmpty(mercUserDevices)) {
  311. mercUserDevices.forEach(mud -> {
  312. Long deviceIdUser = mud.getDeviceId();
  313. if (deviceId == deviceIdUser) {
  314. userInfoIdList.add(mud.getUserId());
  315. }
  316. });
  317. }
  318. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  319. if (CollUtil.isNotEmpty(userInfoList)) {
  320. Set<String> mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  321. .map(UserInfoDto.Vo::getMpOpenid)
  322. .collect(Collectors.toSet());
  323. mpOpenIds.addAll(mercMpOpenIds);
  324. }
  325. }
  326. if (CollUtil.isNotEmpty(mpOpenIds)) {
  327. Set<String> mpIds = new HashSet<>(mpOpenIds);
  328. for (MsgConfigTestDto.BizData b : bizDataList) {
  329. String channelType = b.getChannelType();
  330. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  331. //微信公众号
  332. //扩展参数
  333. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  334. Map<String, Object> params = MapUtil.newHashMap();
  335. params.put("keyword1", deviceName);
  336. params.put("keyword2", msg);
  337. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  338. //商户管理员发
  339. b.setReceivers(mpIds);
  340. b.setTemplateParams(params);
  341. b.setExtraParam(bizExtraParam);
  342. sendList.add(b);
  343. }
  344. }
  345. }
  346. //是否可推送条件
  347. if (CollUtil.isNotEmpty(sendList)) {
  348. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  349. }
  350. }
  351. }
  352. }
  353. /**
  354. * 设备联网状态
  355. *
  356. * @param requestParams
  357. * @param netState
  358. */
  359. private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
  360. JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
  361. String clientid = jsonObject.getStr("clientid");
  362. Long deviceId;
  363. try {
  364. deviceId = Long.valueOf(clientid);
  365. } catch (Exception e) {
  366. return;
  367. }
  368. //上报状态
  369. DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
  370. up.setNetState(netState);
  371. deviceStatusService.up(up).getCode();
  372. //添加联网记录
  373. DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
  374. .setDeviceId(deviceId);
  375. save.setNetStatus(netState);
  376. deviceNetRecordService.save(save);
  377. }
  378. }