MqttServiceImpl.java 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package com.xy.service;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.metadata.IPage;
  6. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  7. import com.xy.annotation.LogOperate;
  8. import com.xy.collections.list.JArrayList;
  9. import com.xy.collections.list.JList;
  10. import com.xy.collections.map.JHashMap;
  11. import com.xy.collections.map.JMap;
  12. import com.xy.dto.*;
  13. import com.xy.entity.DeviceInfo;
  14. import com.xy.entity.MqttCommand;
  15. import com.xy.enums.LogEnum;
  16. import com.xy.mapper.MqttCommandMapper;
  17. import com.xy.service.factory.cmd.CmdService;
  18. import com.xy.service.factory.device.DeviceFactory;
  19. import com.xy.utils.*;
  20. import com.xy.utils.enums.MqttCommandStatusEnum;
  21. import io.swagger.annotations.Api;
  22. import io.swagger.annotations.ApiOperation;
  23. import lombok.RequiredArgsConstructor;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.springframework.stereotype.Service;
  26. import org.springframework.util.StringUtils;
  27. import org.springframework.validation.annotation.Validated;
  28. import org.springframework.web.bind.annotation.PostMapping;
  29. import org.springframework.web.bind.annotation.RequestBody;
  30. import java.util.ArrayList;
  31. import java.util.List;
  32. import java.util.Map;
  33. import static com.xy.utils.PlusBeans.*;
  34. @Slf4j
  35. @Service
  36. @Api(tags = "mqtt接口")
  37. @RequiredArgsConstructor
  38. public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand> implements MqttService {
  39. private final DeviceMqttSendService deviceMqttSendService;
  40. private final RedisService<String> redisService;
  41. private final DeviceInfoServiceImpl deviceInfoService;
  42. private String keyStr = "mqtt:cmd:result:%d";
  43. public long getSn() {
  44. return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
  45. }
  46. public long getWkSn() {
  47. return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
  48. }
  49. public List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttCmdDto> mqttDtos) {
  50. String time = DataTime.getSring();
  51. List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
  52. mqttDtos.forEach(mqttDto -> {
  53. //给予默认值
  54. PaterDto pater = mqttDto.getPater()
  55. .setSn(getSn())
  56. .setTime(time)
  57. .setDeviceId(mqttDto.getDeviceId())
  58. .setDebug(mqttDto.getDebug());
  59. //延迟发布处理
  60. String delayTime = mqttDto.getDelayTime();
  61. String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
  62. if (!StringUtils.isEmpty(delayTime)) {
  63. //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
  64. long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
  65. topic = String.format("$delayed/%d/%s", m, topic);
  66. mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
  67. }
  68. //消息过期时间
  69. pater.setTimeout(mqttDto.getTimeout());
  70. JSONObject value = JSONUtil.parseObj(pater);
  71. Long wkSn = pater.getWkSn();
  72. try {
  73. //指令记录
  74. saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
  75. //发送消息
  76. deviceMqttSendService.cmd(new MqttDto.CmdParams()
  77. .setQos(mqttDto.getLevel())
  78. .setData(value.toString())
  79. .setTopic(topic)
  80. );
  81. list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
  82. } catch (Exception e) {
  83. //指令记录
  84. saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
  85. list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false));
  86. }
  87. });
  88. return list;
  89. }
  90. @Override
  91. @LogOperate
  92. @ApiOperation("指令发送")
  93. public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
  94. JList<CommandMqtt> commandMqttsList = new JArrayList<>(commandMqtts);
  95. List<DeviceInfo> deviceInfos = deviceInfoService.list(new LambdaQueryWrapper<DeviceInfo>().in(DeviceInfo::getDeviceId, commandMqttsList.getProperty(CommandMqtt::getDeviceId).comparing()));
  96. JMap<Long, DeviceInfo> deviceInfosJMaps = new JArrayList<>(deviceInfos).toMap(DeviceInfo::getDeviceId).cover();
  97. JMap<Integer, JList<CommandMqtt>> jMap = new JHashMap<>();
  98. commandMqtts.forEach(commandMqtt -> {
  99. DeviceInfo deviceInfo = deviceInfosJMaps.get(commandMqtt.getDeviceId());
  100. if (deviceInfo == null) {
  101. return;
  102. }
  103. if (!jMap.containsKey(deviceInfo.getDeviceType())) {
  104. jMap.put(deviceInfo.getDeviceType(), new JArrayList<>());
  105. }
  106. jMap.get(deviceInfo.getDeviceType()).set(commandMqtt);
  107. });
  108. List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>();
  109. jMap.forEach((deviceType, commandMqttss) -> {
  110. List<Tuple.Tuple3<Long, Long, Boolean>> data = FactoryUtils.getServiceRoute(DeviceFactory.class, deviceType)
  111. .senCommand(commandMqttss).getData();
  112. if (data != null) {
  113. list.addAll(data);
  114. }
  115. });
  116. return R.ok(list).setLogMsg(JSONUtil.parseArray(jMap.getValues()).toString());
  117. }
  118. @ApiOperation("指令Ack")
  119. @PostMapping("commandAck")
  120. public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) {
  121. MqttCommand mqttCommand = getById(ackMqttDto.getSn());
  122. if (!Emptys.check(mqttCommand)) {
  123. return R.fail("数据不存在");
  124. }
  125. //写入mysql
  126. mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey()))
  127. .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime()))
  128. .setBackServerTime(DataTime.getSring());
  129. updateById(mqttCommand);
  130. //写入redis
  131. String result = ackMqttDto.getResult().toString();
  132. if (Emptys.check(ackMqttDto.getResult())) {
  133. String key = String.format(keyStr, ackMqttDto.getSn());
  134. String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime());
  135. Map<String, String> map = redisService.getMap(key);
  136. if (Emptys.check(map)) {
  137. String time = map.get("time");
  138. if (DataTime.stringContrast(time, backClientTime) >= 0) {
  139. return R.ok();
  140. }
  141. }
  142. JMap<String, String> jmap = new JHashMap<String, String>().set("time", backClientTime).set("result", result);
  143. redisService.setMap(key, jmap);
  144. redisService.timeout(key, 300);
  145. }
  146. return R.ok();
  147. }
  148. @ApiOperation("指令结果通知")
  149. @PostMapping("commandResultBack")
  150. public R commandResultBack(@RequestBody @Validated MqttCmdDto.ResultBack resultBack) {
  151. return FactoryUtils.getService(CmdService.class, resultBack.getCmdType()).resultBack(resultBack);
  152. }
  153. @PostMapping("snByCmdAndResult")
  154. @ApiOperation("sn查询指令和结果数据")
  155. public R<MqttCmdDto.Vo3> snByCmdAndResult(@RequestBody MqttCmdDto.SnByCmdAndResult snByCmdAndResult) {
  156. MqttCommand mqttCommand = getById(snByCmdAndResult.getSn());
  157. String key = String.format(keyStr, snByCmdAndResult.getSn());
  158. Map<String, String> map = redisService.getMap(key);
  159. MqttCmdDto.Vo3 vo3 = copy(MqttCmdDto.Vo3.class, mqttCommand);
  160. if (Emptys.check(map)) {
  161. String result = map.get("result");
  162. vo3.setResult(JSONUtil.parseObj(result));
  163. }
  164. return R.ok(vo3);
  165. }
  166. @PostMapping("page")
  167. @ApiOperation("分页查询")
  168. public R<PageBean<MqttCmdDto.Vo2>> page(@RequestBody MqttCmdDto.Page page) {
  169. LambdaQueryWrapper<MqttCommand> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class)
  170. .like(MqttCommand::getError)
  171. .like(MqttCommand::getValue)
  172. .ge(MqttCommand::getSendTime, page.getBeginSendTime())
  173. .le(MqttCommand::getSendTime, page.getEndSendTime())
  174. .ge(MqttCommand::getTimeout, page.getBeginTimeout())
  175. .le(MqttCommand::getTimeout, page.getEndTimeout())
  176. .ge(MqttCommand::getDelayTime, page.getBeginDelayTime())
  177. .le(MqttCommand::getDelayTime, page.getEndDelayTime())
  178. .build()
  179. .orderByDesc(MqttCommand::getSendTime);
  180. PageBean pageBean = page.getPage();
  181. IPage<MqttCommand> iPage = page(toIPage(pageBean), lambdaQueryWrapper);
  182. return R.ok(toPageBean(MqttCmdDto.Vo2.class, iPage));
  183. }
  184. /**
  185. * 新增指令记录
  186. *
  187. * @param mqttDto 参数对象
  188. * @param value 参数json对象
  189. * @param status 状态
  190. * @param error 错误信息
  191. */
  192. private void saveMqttCommand(MqttCmdDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
  193. PaterDto pater = mqttDto.getPater();
  194. MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
  195. .setSn(pater.getSn())
  196. .setWkSn(pater.getWkSn())
  197. .setTopic(topic)
  198. .setStatus(Integer.parseInt(status.getKey()))
  199. .setValue(value.toString())
  200. .setSendTime(pater.getTime())
  201. .setNum(1)
  202. .setError(error);
  203. save(mqttCommand);
  204. }
  205. }