|
@@ -0,0 +1,139 @@
|
|
|
+package com.xy.consumer.device.push.msg;
|
|
|
+
|
|
|
+import cn.hutool.core.bean.BeanUtil;
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import cn.hutool.core.date.DatePattern;
|
|
|
+import cn.hutool.core.date.DateUtil;
|
|
|
+import cn.hutool.core.map.MapUtil;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
|
+import com.xy.consumer.MqttConsumer;
|
|
|
+import com.xy.dto.MsgConfigTestDto;
|
|
|
+import com.xy.dto.UserInfoDto;
|
|
|
+import com.xy.dto.be.MercDto;
|
|
|
+import com.xy.dto.be.MercUserDeviceDto;
|
|
|
+import com.xy.entity.DeviceEventMsg;
|
|
|
+import com.xy.entity.DeviceInfo;
|
|
|
+import com.xy.entity.SysCodeConfigureRedis;
|
|
|
+import com.xy.enums.ChannelType;
|
|
|
+import com.xy.service.DeviceInfoServiceImpl;
|
|
|
+import com.xy.service.MsgSendApiService;
|
|
|
+import com.xy.service.UserInfoService;
|
|
|
+import com.xy.service.be.MercFeignService;
|
|
|
+import com.xy.utils.R;
|
|
|
+import com.xy.utils.SysCodeConfigureUtils;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * 设备消息推送消费者
|
|
|
+ *
|
|
|
+ * @author 谭斌
|
|
|
+ * @date 2023/06/06
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@AllArgsConstructor
|
|
|
+public class DevicePushMsgConsumer implements MqttConsumer {
|
|
|
+
|
|
|
+ private MsgSendApiService msgSendApiService;
|
|
|
+ private DeviceInfoServiceImpl deviceInfoService;
|
|
|
+ private MercFeignService mercFeignService;
|
|
|
+ private UserInfoService userInfoService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean message(String topic, String payload) {
|
|
|
+ log.info("-----设备故障消息推送-----:{}", payload);
|
|
|
+ DeviceEventMsg deviceEventMsg = JSONUtil.toBean(payload, DeviceEventMsg.class);
|
|
|
+ if (deviceEventMsg != null) {
|
|
|
+ Long deviceId = deviceEventMsg.getDeviceId();
|
|
|
+ Long mercId = deviceEventMsg.getMercId();
|
|
|
+ LocalDateTime createTime = deviceEventMsg.getCreateTime();
|
|
|
+ String msg = deviceEventMsg.getMsg();
|
|
|
+ SysCodeConfigureRedis sysCodeConfigureRedis = SysCodeConfigureUtils.get(deviceEventMsg.getCode());
|
|
|
+
|
|
|
+ Long configId = 12L;
|
|
|
+ List<MsgConfigTestDto.BizParam> bizParams = R.feignCheckData(msgSendApiService.getBizParamByMsgConfig(new MsgConfigTestDto.MsgConfig().setConfigId(configId)));
|
|
|
+ if (CollUtil.isNotEmpty(bizParams)) {
|
|
|
+ List<MsgConfigTestDto.BizData> bizDataList = BeanUtil.copyToList(bizParams, MsgConfigTestDto.BizData.class);
|
|
|
+ List<MsgConfigTestDto.BizData> sendList = new ArrayList<>();
|
|
|
+ for (MsgConfigTestDto.BizData b : bizDataList) {
|
|
|
+ String channelType = b.getChannelType();
|
|
|
+ if (Integer.valueOf(channelType).intValue() == ChannelType.OFFICIAL_ACCOUNT.getCode().intValue()) {
|
|
|
+ //TODO: 暂无订阅功能。给商户管理员、以及有此设备权限的人推送消息, 查询微信公众号openID
|
|
|
+ MercDto.Vo merc = R.feignCheckData(mercFeignService.obj(new MercDto.ListDTO().setId(mercId)));
|
|
|
+ Long userInfoId = merc.getUserInfoId();
|
|
|
+ List<Long> userInfoIdList = new ArrayList<>();
|
|
|
+ userInfoIdList.add(userInfoId);
|
|
|
+ List<MercUserDeviceDto.Vo> mercUserDevices = R.feignCheckData(mercFeignService.mercDeviceUsers(
|
|
|
+ new MercUserDeviceDto.Vo().setMercId(mercId).setDeviceId(deviceId)));
|
|
|
+ if (CollUtil.isNotEmpty(mercUserDevices)) {
|
|
|
+ List<Long> uids = mercUserDevices.stream().map(MercUserDeviceDto.Vo::getUserId).collect(Collectors.toList());
|
|
|
+ if (CollUtil.isNotEmpty(uids)) {
|
|
|
+ userInfoIdList.addAll(uids);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //查询userinfo表的mpOpenId
|
|
|
+ List<UserInfoDto.Vo> userInfoList = R.feignCheckData(userInfoService.list(new UserInfoDto.SelectListDto().setUserIds(userInfoIdList)));
|
|
|
+
|
|
|
+ Set<String> mpOpenIds = userInfoList.stream().filter(Objects::nonNull)
|
|
|
+ .map(UserInfoDto.Vo::getMpOpenid)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ if (CollUtil.isEmpty(mpOpenIds)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {{first.DATA}}
|
|
|
+ * 设备名称:{{keyword1.DATA}}
|
|
|
+ * 告警内容:{{keyword2.DATA}}
|
|
|
+ * 发生时间:{{keyword3.DATA}}
|
|
|
+ * {{remark.DATA}}
|
|
|
+ *
|
|
|
+ */
|
|
|
+ DeviceInfo deviceInfo = deviceInfoService.getById(deviceId);
|
|
|
+ String deviceName = deviceInfo.getDeviceName();
|
|
|
+ if (StrUtil.isEmpty(deviceName)) {
|
|
|
+ deviceName = deviceId + "";
|
|
|
+ } else {
|
|
|
+ deviceName = deviceName + "(" + deviceId + ")";
|
|
|
+ }
|
|
|
+
|
|
|
+ //微信公众号
|
|
|
+ //扩展参数
|
|
|
+ MsgConfigTestDto.BizExtraParam bizExtraParam = new MsgConfigTestDto.BizExtraParam();
|
|
|
+ if (ObjectUtil.isNotEmpty(sysCodeConfigureRedis)) {
|
|
|
+ String propose = sysCodeConfigureRedis.getPropose();
|
|
|
+ if (StrUtil.isNotEmpty(propose) && propose.contains("http")) {
|
|
|
+ bizExtraParam.setUrl(propose);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Map<String, Object> params = MapUtil.newHashMap();
|
|
|
+ params.put("keyword1", deviceName);
|
|
|
+ params.put("keyword2", msg);
|
|
|
+ params.put("keyword3", DateUtil.format(createTime, DatePattern.NORM_DATETIME_PATTERN));
|
|
|
+
|
|
|
+ //商户管理员发
|
|
|
+ b.setReceivers(mpOpenIds);
|
|
|
+ b.setTemplateParams(params);
|
|
|
+ b.setExtraParam(bizExtraParam);
|
|
|
+ sendList.add(b);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (CollUtil.isNotEmpty(sendList)) {
|
|
|
+ msgSendApiService.sendByMsgConfig(new MsgConfigTestDto.SendByMsgConfig().setConfigId(configId).setBizDataList(sendList));
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|