DeviceMqttConsumerImpl.java 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. package com.xy.service;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.collection.CollUtil;
  4. import cn.hutool.core.date.DatePattern;
  5. import cn.hutool.core.date.DateUtil;
  6. import cn.hutool.core.date.LocalDateTimeUtil;
  7. import cn.hutool.core.map.MapUtil;
  8. import cn.hutool.core.util.BooleanUtil;
  9. import cn.hutool.core.util.ObjectUtil;
  10. import cn.hutool.core.util.StrUtil;
  11. import cn.hutool.json.JSONObject;
  12. import cn.hutool.json.JSONUtil;
  13. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  14. import com.xy.config.DeviceThreadPoolConfig;
  15. import com.xy.consts.MercConstant;
  16. import com.xy.device.EnumDeviceOnlineStatus;
  17. import com.xy.dto.*;
  18. import com.xy.dto.be.MercDto;
  19. import com.xy.dto.be.MercUserDeviceDto;
  20. import com.xy.entity.*;
  21. import com.xy.enums.ChannelType;
  22. import com.xy.enums.MsgConfigId;
  23. import com.xy.enums.MsgType;
  24. import com.xy.enums.SmsSceneEnum;
  25. import com.xy.event.DeviceEvent;
  26. import com.xy.service.be.MercFeignService;
  27. import com.xy.utils.*;
  28. import com.xy.utils.enums.DeviceErrorRecordTypesEnum;
  29. import io.swagger.annotations.Api;
  30. import io.swagger.annotations.ApiOperation;
  31. import lombok.AllArgsConstructor;
  32. import lombok.extern.slf4j.Slf4j;
  33. import org.springframework.scheduling.annotation.Async;
  34. import org.springframework.stereotype.Service;
  35. import java.time.LocalDateTime;
  36. import java.util.*;
  37. import java.util.stream.Collectors;
  38. @Slf4j
  39. @Service
  40. @AllArgsConstructor
  41. @Api(tags = "设备消费者")
  42. public class DeviceMqttConsumerImpl implements DeviceMqttConsumer {
  43. private AliSmsService aliSmsService;
  44. private MsgSendApiService msgSendApiService;
  45. private DeviceInfoServiceImpl deviceInfoService;
  46. private MercFeignService mercFeignService;
  47. private UserInfoService userInfoService;
  48. private MsgSysMonitorNoticeService msgSysMonitorNoticeService;
  49. private DeviceFaultLogServiceImpl deviceFaultLogService;
  50. private DeviceFaultInfoServiceImpl deviceFaultInfoService;
  51. private MercSmsSubscriptionService mercSmsSubscriptionService;
  52. private MercSmsPackageService mercSmsService;
  53. @Override
  54. @ApiOperation("设备在线")
  55. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  56. public void connected(DeviceMqttDto.RequestParams requestParams) {
  57. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.CONNECTED.getCode(), Integer.class);
  58. netWork(requestParams, netState);
  59. }
  60. @Override
  61. @ApiOperation("设备离线")
  62. @Async(DeviceThreadPoolConfig.DEVICE_NETWORK_POLL)
  63. public void disconnect(DeviceMqttDto.RequestParams requestParams) {
  64. Integer netState = SysDictUtils.getValue(EnumDeviceOnlineStatus.Code.CODE.getCode(), EnumDeviceOnlineStatus.DISCONNECT.getCode(), Integer.class);
  65. netWork(requestParams, netState);
  66. }
  67. @Override
  68. @ApiOperation("设备消息推送")
  69. public void devicePushMsg(DeviceMqttDto.RequestParams requestParams) {
  70. log.info("设备消息推送MQTT:{}", JSONUtil.toJsonPrettyStr(requestParams));
  71. DeviceEventMsg deviceEventMsg = JSONUtil.toBean(requestParams.getData(), DeviceEventMsg.class);
  72. if (deviceEventMsg != null) {
  73. Long deviceId = deviceEventMsg.getDeviceId();
  74. Long mercId = deviceEventMsg.getMercId();
  75. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  76. String mercCode = merc.getMercCode();
  77. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  78. String msg = deviceEventMsg.getMsg();
  79. String code = deviceEventMsg.getCode();
  80. SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(code);
  81. String expand = sysCodeConfigureRedis.getExpand();
  82. if (StrUtil.isEmpty(expand)) {
  83. return;
  84. }
  85. String paterCode = sysCodeConfigureRedis.getPaterCode();
  86. SysCodeConfigureRedis parentCodeConfig = SysCodeConfigureUtils.get(paterCode);
  87. String eventType = parentCodeConfig.getMsg();
  88. String title = sysCodeConfigureRedis.getTitle();
  89. //是否展示给商户
  90. boolean expShow2Merc = SysCodeConfigureUtils.getExpShow2Merc(expand);
  91. //是否推送给商户
  92. boolean expPush2Merc = SysCodeConfigureUtils.getExpPush2Merc(expand);
  93. //是否推送时间段
  94. boolean expCanPush = SysCodeConfigureUtils.getExpCanPush(expand);
  95. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  96. if (MercAuthUtils.sendMsgToCM(mercCode)) {
  97. configId = MsgConfigId.CM_DEVICE_EXCEPTION.getId();
  98. }
  99. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  100. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  101. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  102. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  103. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  104. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  105. /**
  106. * {{first.DATA}}
  107. * 设备名称:{{keyword1.DATA}}
  108. * 告警内容:{{keyword2.DATA}}
  109. * 发生时间:{{keyword3.DATA}}
  110. * {{remark.DATA}}
  111. *
  112. */
  113. String deviceName = deviceInfo.getDeviceName();
  114. if (StrUtil.isEmpty(deviceName)) {
  115. deviceName = deviceId + "";
  116. } else {
  117. deviceName = deviceName + "(" + deviceId + ")";
  118. }
  119. Long userInfoId = merc.getUserInfoId();
  120. List<Long> userInfoIdList = new ArrayList<>();
  121. userInfoIdList.add(userInfoId);
  122. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  123. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  124. if (CollUtil.isNotEmpty(mercUserDevices)) {
  125. mercUserDevices.forEach(mud -> {
  126. Long deviceIdUser = mud.getDeviceId();
  127. if (Objects.equals(deviceId, deviceIdUser)) {
  128. userInfoIdList.add(mud.getUserId());
  129. }
  130. });
  131. }
  132. StringBuilder sbMsg = StrUtil.builder().append("设备")
  133. .append("【")
  134. .append(deviceName)
  135. .append("】")
  136. .append("发生故障:").append(msg).append(",发生时间:")
  137. .append(DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  138. //查询userinfo表的mpOpenId
  139. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  140. //配置了展示给商户才生成数据
  141. if (expShow2Merc) {
  142. userInfoList.forEach(u -> {
  143. String name = u.getName();
  144. //系统消息
  145. MsgSysMonitorNoticeDto.Save msgSys = new MsgSysMonitorNoticeDto.Save();
  146. msgSys.setUserId(u.getAuthorizeUserId());
  147. msgSys.setMsgType(MsgType.TYPE6.getCode()).setContent(sbMsg.toString())
  148. .setPubTime(LocalDateTime.now()).setPubState(2)
  149. .setSendTargets(name)
  150. .setPriority(msgConfig.getPriority())
  151. .setTitle(eventType + "-" + title)
  152. .setEventCode(code)
  153. .setPubUserId(-1).setTitle(MsgType.TYPE6.getDescription());
  154. msgSysMonitorNoticeService.save(msgSys);
  155. });
  156. }
  157. for (MsgConfigTestDto.BizData b : bizDataList) {
  158. String channelType = b.getChannelType();
  159. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  160. //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
  161. Set<String> mpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  162. .map(UserInfoDto.Vo::getMpOpenid)
  163. .collect(Collectors.toSet());
  164. if (CollUtil.isEmpty(mpOpenIds)) {
  165. continue;
  166. }
  167. //微信公众号
  168. //扩展参数
  169. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  170. if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
  171. String propose = sysCodeConfigureRedis.getPropose();
  172. if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
  173. bizExtraParam.setUrl(propose);
  174. }
  175. }
  176. Map<String, Object> params = MapUtil.newHashMap();
  177. if (MercAuthUtils.sendMsgToCM(mercCode)) {
  178. //设备编号:{{character_string2.DATA}} 故障类型:{{thing6.DATA}} 报障时间:{{time8.DATA}}
  179. params.put("character_string2", deviceId);
  180. params.put("thing6", msg);
  181. params.put("time8", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  182. } else {
  183. params.put("keyword1", deviceName);
  184. params.put("keyword2", msg);
  185. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  186. }
  187. //商户管理员发
  188. b.setReceivers(mpOpenIds);
  189. b.setTemplateParams(params);
  190. b.setExtraParam(bizExtraParam);
  191. sendList.add(b);
  192. }
  193. }
  194. //是否可推送条件
  195. if (CollUtil.isNotEmpty(sendList) && expPush2Merc && expCanPush) {
  196. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  197. }
  198. }
  199. //短信通知
  200. log.info("设备告警,短信通知处理开始。。。");
  201. deviceSmsPushMsg(merc,deviceInfo,createTime,code);
  202. }
  203. }
  204. /**
  205. * 商户设备异常短信通知
  206. * @param merc
  207. * @param deviceInfo
  208. * @param createTime
  209. * @param eventCode
  210. */
  211. public void deviceSmsPushMsg(MercDto.Vo merc,DeviceInfo deviceInfo,LocalDateTime createTime,String eventCode) {
  212. Long deviceId = deviceInfo.getDeviceId();
  213. String deviceName = deviceInfo.getDeviceName();
  214. if (StrUtil.isEmpty(deviceName)) {
  215. deviceName = deviceId + "";
  216. } else {
  217. deviceName = deviceName + "(" + deviceId + ")";
  218. }
  219. String exceptionType="";
  220. //商家是否订阅
  221. if(DeviceErrorRecordTypesEnum.T.getCode().equals(eventCode)) {
  222. exceptionType = "温度";
  223. }else if(DeviceErrorRecordTypesEnum.NET.getCode().equals(eventCode)){
  224. exceptionType="网络";
  225. }else if(DeviceErrorRecordTypesEnum.DOOR_LOCK.getCode().equals(eventCode)){
  226. exceptionType="门锁";
  227. }else if(DeviceErrorRecordTypesEnum.LIGHT.getCode().equals(eventCode)){
  228. exceptionType="灯光";
  229. }else if(DeviceErrorRecordTypesEnum.CAMERA.getCode().equals(eventCode)){
  230. exceptionType="摄像头";
  231. }else if(DeviceErrorRecordTypesEnum.COMPRESSOR.getCode().equals(eventCode)){
  232. exceptionType="压缩机";
  233. }else{
  234. log.info("设备告警短信通知:异常类型未配置,eventCode{}",eventCode);
  235. }
  236. //温度异常
  237. Boolean b = R.feignCheckData(mercSmsSubscriptionService.isSubscribedByCode(new MercSmsSubscriptionDto.Vo().setMercId(merc.getId()).setEventCode(eventCode)));
  238. log.info("设备告警,商户是否订阅短信通知:{},eventCode:{}",b,eventCode);
  239. boolean isSubscribed = BooleanUtil.isTrue(b);
  240. //商户是否订阅短信通知
  241. if(isSubscribed&&StrUtil.isNotEmpty(exceptionType)){
  242. UserInfoDto.Vo userInfo = R.feignCheckData(userInfoService.obj(new UserInfoDto.Vo().setUserId(merc.getUserInfoId())));
  243. if(userInfo!= null && StrUtil.isNotEmpty(userInfo.getTel())){
  244. String tel = userInfo.getTel();
  245. //短信推送 设备告警:设备${device_id}于${time}发生${exception_type}异常。请登录后台查看详情并及时处理。
  246. Long configId = MsgConfigId.SMS_NOTIFY_DEVICE.getId();
  247. String formatDate = LocalDateTimeUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN);
  248. Map<String, Object> templateParams = MapUtil.newHashMap();
  249. templateParams.put("device_id", deviceName);
  250. templateParams.put("time", formatDate);
  251. templateParams.put("exception_type", exceptionType);
  252. log.info("商户设备告警短信通知:{}", JSONUtil.toJsonPrettyStr(templateParams));
  253. // 扣除商户短信资源包
  254. R deduct = mercSmsService.deduct(new MercSmsPackageDto.Deduct()
  255. .setMercId(merc.getId())
  256. .setMsg("设备告警短信通知,设备号:" + deviceId)
  257. );
  258. if (deduct.getCode() == R.Enum.SUCCESS.getCode()) {
  259. R.feignCheck(aliSmsService.sendMsgCommon(new SmsSendCommonDTO().setSignName("").setMsgConfigId(configId).setMobile(tel).setTemplateParams(templateParams).setScene(SmsSceneEnum.B_DEVICE_ALARM_NOTIFY.getScene())));
  260. }else{
  261. log.info("设备告警短信通知#商户{}短信资源包不足,无法发送短信通知:{}",merc.getId());
  262. }
  263. }
  264. }
  265. }
  266. /**
  267. * 设备故障
  268. *
  269. * @param requestParams
  270. */
  271. @Override
  272. public void deviceFault(DeviceMqttDto.RequestParams requestParams) {
  273. DeviceFaultLogDto.MqttData mqttData = JSONUtil.toBean(requestParams.getData(), DeviceFaultLogDto.MqttData.class);
  274. // log.info("设备故障监控事件消费,接收mqtt data:{}", JSONUtil.toJsonPrettyStr(mqttData));
  275. if (mqttData != null) {
  276. DeviceEventMsgDto.Vo deviceEventMsg = mqttData.getDeviceEventMsg();
  277. Long mercId = deviceEventMsg.getMercId();
  278. String code = deviceEventMsg.getCode();
  279. Long deviceId = deviceEventMsg.getDeviceId();
  280. String msg = deviceEventMsg.getMsg();
  281. //加入故障日志
  282. DeviceFaultLog log = new DeviceFaultLog()
  283. .setMercId(mercId)
  284. .setDescribes(msg)
  285. .setEventCode(code)
  286. .setEventTime(deviceEventMsg.getCreateTime())
  287. .setDeviceId(deviceId);
  288. deviceFaultLogService.save(log);
  289. List<DeviceFaultMonitorConfigDto.Vo> configList = mqttData.getConfigList();
  290. // 按故障级别排序 降序
  291. List<DeviceFaultMonitorConfigDto.Vo> configSortedList = configList.stream()
  292. .sorted(Comparator.comparing(DeviceFaultMonitorConfigDto.Vo::getFaultLevel).reversed())
  293. .collect(Collectors.toList());
  294. for (DeviceFaultMonitorConfigDto.Vo config : configSortedList) {
  295. Integer eventNum = config.getEventNum();
  296. Long minuteNum = config.getMinuteNum();
  297. Integer faultLevel = config.getFaultLevel();
  298. Long monitorId = config.getMonitorId();
  299. Boolean toAfterSale = BooleanUtil.isTrue(config.getToAfterSale());
  300. // 获取当前时间
  301. Date now = DateUtil.date();
  302. // 计算最近n分钟的开始时间
  303. Date minutesAgo = DateUtil.offsetMinute(now, -minuteNum.intValue());
  304. //统计同类事件累计次数
  305. Long count = deviceFaultLogService.count(Wrappers.<DeviceFaultLog>lambdaQuery()
  306. .eq(DeviceFaultLog::getDeviceId, deviceId)
  307. .eq(DeviceFaultLog::getMercId, mercId)
  308. .eq(DeviceFaultLog::getEventCode, code)
  309. .ge(DeviceFaultLog::getEventTime, minutesAgo)
  310. );
  311. //达到阈值次数
  312. if (count != null && count.intValue() >= eventNum) {
  313. //故障表入库
  314. //是否已存在 未解决的
  315. DeviceFaultInfo deviceFaultInfo = deviceFaultInfoService.getOne(Wrappers.<DeviceFaultInfo>lambdaQuery()
  316. .eq(DeviceFaultInfo::getDeviceId, deviceId)
  317. .eq(DeviceFaultInfo::getMercId, mercId)
  318. .eq(DeviceFaultInfo::getEventCode, code)
  319. .eq(DeviceFaultInfo::getState, false)
  320. );
  321. if (deviceFaultInfo != null) {
  322. Integer faultLevel1 = deviceFaultInfo.getFaultLevel();
  323. if (faultLevel1.intValue() < faultLevel) {
  324. //小于,当前故障级别,更新
  325. deviceFaultInfo.setFaultLevel(faultLevel);
  326. deviceFaultInfo.setToAfterSale(toAfterSale);
  327. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  328. }
  329. //累加次数
  330. deviceFaultInfo.setEventNum(deviceFaultInfo.getEventNum() + 1).setMonitorId(monitorId);
  331. deviceFaultInfoService.updateById(deviceFaultInfo);
  332. } else {
  333. deviceFaultInfo = new DeviceFaultInfo()
  334. .setMercId(mercId)
  335. .setDeviceId(deviceId)
  336. .setDescribes(msg)
  337. .setFaultLevel(faultLevel)
  338. .setEventCode(code)
  339. .setEventNum(count.intValue())
  340. .setMonitorId(monitorId)
  341. .setToAfterSale(toAfterSale);
  342. deviceFaultInfoService.save(deviceFaultInfo);
  343. deviceInfoService.updateById(new DeviceInfo().setDeviceId(deviceId).setFaultLevel(faultLevel));
  344. }
  345. //给售后推送故障信息
  346. if (toAfterSale.booleanValue()) {
  347. deviceFaultPushMsg(deviceEventMsg);
  348. }
  349. //已按最大故障级别排序,所以满足条件就退出循环
  350. break;
  351. }
  352. }
  353. }
  354. }
  355. /**
  356. * 故障推送给所有售后
  357. *
  358. * @param deviceEventMsg
  359. */
  360. public void deviceFaultPushMsg(DeviceEventMsgDto.Vo deviceEventMsg) {
  361. if (deviceEventMsg != null) {
  362. Long deviceId = deviceEventMsg.getDeviceId();
  363. Long mercId = deviceEventMsg.getMercId();
  364. LocalDateTime createTime = deviceEventMsg.getCreateTime();
  365. String msg = deviceEventMsg.getMsg();
  366. String code = deviceEventMsg.getCode();
  367. Long configId = MsgConfigId.DEVICE_EXCEPTION.getId();
  368. MsgConfigDto.Vo msgConfig = R.feignCheckData(msgSendApiService.getMsgConfig(new MsgConfigDto.Vo().setId(configId)));
  369. List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
  370. if (CollUtil.isNotEmpty(bizParams) && msgConfig != null) {
  371. List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
  372. List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
  373. DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
  374. String deviceName = deviceInfo.getDeviceName();
  375. if (StrUtil.isEmpty(deviceName)) {
  376. deviceName = deviceId + "";
  377. } else {
  378. deviceName = deviceName + "(" + deviceId + ")";
  379. }
  380. // 获取售后及售后创建的角色用户
  381. UserInfoDto.SaleBackUsersVO saleBackUsersVO = R.feignCheckData(userInfoService.getSaleBackUsers());
  382. List<String> mpOpenIds = saleBackUsersVO.getMpOpenIds();
  383. //开门超时事件增加商户推送 ERR0020
  384. if ("ERR0020".equals(code)) {
  385. MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
  386. Long userInfoId = merc.getUserInfoId();
  387. List<Long> userInfoIdList = new ArrayList<>();
  388. userInfoIdList.add(userInfoId);
  389. List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
  390. new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
  391. if (CollUtil.isNotEmpty(mercUserDevices)) {
  392. mercUserDevices.forEach(mud -> {
  393. Long deviceIdUser = mud.getDeviceId();
  394. if (deviceId == deviceIdUser) {
  395. userInfoIdList.add(mud.getUserId());
  396. }
  397. });
  398. }
  399. List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
  400. if (CollUtil.isNotEmpty(userInfoList)) {
  401. Set<String> mercMpOpenIds = userInfoList.stream().filter(u -> ObjectUtil.isNotEmpty(u.getMpOpenid()))
  402. .map(UserInfoDto.Vo::getMpOpenid)
  403. .collect(Collectors.toSet());
  404. mpOpenIds.addAll(mercMpOpenIds);
  405. }
  406. }
  407. if (CollUtil.isNotEmpty(mpOpenIds)) {
  408. Set<String> mpIds = new HashSet<>(mpOpenIds);
  409. for (MsgConfigTestDto.BizData b : bizDataList) {
  410. String channelType = b.getChannelType();
  411. if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
  412. //微信公众号
  413. //扩展参数
  414. MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
  415. Map<String, Object> params = MapUtil.newHashMap();
  416. params.put("keyword1", deviceName);
  417. params.put("keyword2", msg);
  418. params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
  419. //商户管理员发
  420. b.setReceivers(mpIds);
  421. b.setTemplateParams(params);
  422. b.setExtraParam(bizExtraParam);
  423. sendList.add(b);
  424. }
  425. }
  426. }
  427. //是否可推送条件
  428. if (CollUtil.isNotEmpty(sendList)) {
  429. msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
  430. }
  431. }
  432. }
  433. }
  434. /**
  435. * 设备联网状态
  436. *
  437. * @param requestParams
  438. * @param netState
  439. */
  440. private void netWork(DeviceMqttDto.RequestParams requestParams, int netState) {
  441. JSONObject jsonObject = JSONUtil.parseObj(requestParams.getData());
  442. String clientid = jsonObject.getStr("clientid");
  443. Long deviceId;
  444. try {
  445. deviceId = Long.valueOf(clientid);
  446. } catch (Exception e) {
  447. return;
  448. }
  449. //发布事件
  450. DeviceEvent.NetWorkEvent netWorkEvent = new DeviceEvent.NetWorkEvent()
  451. .setNetState(netState);
  452. netWorkEvent.setValue(deviceId);
  453. EventUtils.push(() -> netWorkEvent);
  454. }
  455. }