DeviceMqttConsumerImpl.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package com.xy.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.date.DatePattern;
  5. import cn.hutool.core.date.DateUtil;
  6. import cn.hutool.core.map.MapUtil;
  7. import cn.hutool.core.util.ObjectUtil;
  8. import cn.hutool.core.util.StrUtil;
  9. import cn.hutool.json.JSONObject;
  10. import cn.hutool.json.JSONUtil;
  11. import com.xy.config.DeviceThreadPoolConfig;
  12. import com.xy.device.EnumDeviceOnlineStatus;
  13. import com.xy.dto.*;
  14. import com.xy.dto.be.MercDto;
  15. import com.xy.dto.be.MercUserDeviceDto;
  16. import com.xy.entity.DeviceEventMsg;
  17. import com.xy.entity.DeviceInfo;
  18. import com.xy.entity.SysCodeConfigureRedis;
  19. import com.xy.enums.ChannelType;
  20. import com.xy.enums.MsgConfigId;
  21. import com.xy.enums.MsgType;
  22. import com.xy.service.be.MercFeignService;
  23. import com.xy.utils.R;
  24. import com.xy.utils.SysCodeConfigureUtils;
  25. import com.xy.utils.SysDictUtils;
  26. import io.swagger.annotations.Api;
  27. import io.swagger.annotations.ApiOperation;
  28. import lombok.AllArgsConstructor;
  29. import org.springframework.scheduling.annotation.Async;
  30. import org.springframework.stereotype.Service;
  31. import java.time.LocalDateTime;
  32. import java.util.ArrayList;
  33. import java.util.List;
  34. import java.util.Map;
  35. import java.util.Set;
  36. import java.util.stream.Collectors;
  37. @Service
  38. @AllArgsConstructor
  39. @Api(tags = "设备消费者")
  40. public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
  41. private DeviceStatusServiceImpl deviceStatusService;
  42. private DeviceNetRecordServiceImpl deviceNetRecordService;
  43. private MsgSendApiService msgSendApiService;
  44. private DeviceInfoServiceImpl deviceInfoService;
  45. private MercFeignService mercFeignService;
  46. private UserInfoService userInfoService;
  47. private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
  48. @Override
  49. @ApiOperation("设备在线")
  50. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  51. public void connected(DeviceMqttDto.RequestParams requestParams) {
  52. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
  53. netWork(requestParams, netState);
  54. }
  55. @Override
  56. @ApiOperation("设备离线")
  57. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  58. public void disconnect(DeviceMqttDto.RequestParams requestParams) {
  59. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
  60. netWork(requestParams, netState);
  61. }
  62. @Override
  63. @ApiOperation("设备消息推送")
  64. public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
  65. DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
  66. if (deviceEventMsg != null) {
  67. Long deviceId = deviceEventMsg.getDeviceId();
  68. Long mercId = deviceEventMsg.getMercId();
  69. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  70. String msg = deviceEventMsg.getMsg();
  71. String code = deviceEventMsg.getCode();
  72. SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
  73. String expand = sysCodeConfigureRedis.getExpand();
  74. if (StrUtil.isEmpty(expand)) {
  75. return;
  76. }
  77. String paterCode = sysCodeConfigureRedis.getPaterCode();
  78. SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
  79. String eventType = parentCodeConfig.getMsg();
  80. String title = sysCodeConfigureRedis.getTitle();
  81. //是否展示给商户
  82. boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
  83. //是否推送给商户
  84. boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
  85. //是否推送时间段
  86. boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
  87. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  88. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  89. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  90. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  91. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  92. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  93. /**
  94. * {{first.DATA}}
  95. * 设备名称:{{keyword1.DATA}}
  96. * 告警内容:{{keyword2.DATA}}
  97. * 发生时间:{{keyword3.DATA}}
  98. * {{remark.DATA}}
  99. *
  100. */
  101. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  102. String deviceName = deviceInfo.getDeviceName();
  103. if (StrUtil.isEmpty(deviceName)) {
  104. deviceName = deviceId + "";
  105. } else {
  106. deviceName = deviceName + "(" + deviceId + ")";
  107. }
  108. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  109. Long userInfoId = merc.getUserInfoId();
  110. List<Long> userInfoIdList = new ArrayList<>();
  111. userInfoIdList.add(userInfoId);
  112. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  113. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  114. if (CollUtil.isNotEmpty(mercUserDevices)) {
  115. mercUserDevices.forEach(mud -> {
  116. Long deviceIdUser = mud.getDeviceId();
  117. if (deviceId == deviceIdUser) {
  118. userInfoIdList.add(mud.getUserId());
  119. }
  120. });
  121. }
  122. StringBuilder sbMsg = StrUtil.builder().append("设备")
  123. .append("【")
  124. .append(deviceName)
  125. .append("】")
  126. .append("发生故障:").append(msg).append(",发生时间:")
  127. .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  128. //查询userinfo表的mpOpenId
  129. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  130. //配置了展示给商户才生成数据
  131. if (expShow2Merc) {
  132. userInfoList.forEach(u -> {
  133. String name = u.getName();
  134. //系统消息
  135. MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
  136. msgSys.setUserId(u.getAuthorizeUserId());
  137. msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
  138. .setPubTime(LocalDateTime.now()).setPubState(2)
  139. .setSendTargets(name)
  140. .setPriority(msgConfig.getPriority())
  141. .setTitle(eventType + "-" + title)
  142. .setEventCode(code)
  143. .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
  144. msgSysMonitorNoticeService.save(msgSys);
  145. });
  146. }
  147. for (MsgConfigTestDto.BizData b : bizDataList) {
  148. String channelType = b.getChannelType();
  149. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  150. //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
  151. Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  152. .map(UserInfoDto.Vo::getMpOpenid)
  153. .collect(Collectors.toSet());
  154. if (CollUtil.isEmpty(mpOpenIds)) {
  155. continue;
  156. }
  157. //微信公众号
  158. //扩展参数
  159. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  160. if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
  161. String propose = sysCodeConfigureRedis.getPropose();
  162. if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
  163. bizExtraParam.setUrl(propose);
  164. }
  165. }
  166. Map<String, Object> params = MapUtil.newHashMap();
  167. params.put("keyword1", deviceName);
  168. params.put("keyword2", msg);
  169. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  170. //商户管理员发
  171. b.setReceivers(mpOpenIds);
  172. b.setTemplateParams(params);
  173. b.setExtraParam(bizExtraParam);
  174. sendList.add(b);
  175. }
  176. }
  177. //是否可推送条件
  178. if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
  179. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  180. }
  181. }
  182. }
  183. }
  184. /**
  185. * 设备联网状态
  186. *
  187. * @param requestParams
  188. * @param netState
  189. */
  190. private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
  191. JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
  192. String clientid = jsonObject.getStr("clientid");
  193. Long deviceId;
  194. try {
  195. deviceId = Long.valueOf(clientid);
  196. } catch (Exception e) {
  197. return;
  198. }
  199. //上报状态
  200. DeviceStatusDto.Up up = new DeviceStatusDto.Up().setDeviceId(deviceId);
  201. up.setNetState(netState);
  202. deviceStatusService.up(up).getCode();
  203. //添加联网记录
  204. DeviceNetRecordDto.Save save = new DeviceNetRecordDto.Save()
  205. .setDeviceId(deviceId);
  206. save.setNetStatus(netState);
  207. deviceNetRecordService.save(save);
  208. }
  209. }