MqttServiceImpl.java 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package com.xy.service;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.metadata.IPage;
  6. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  7. import com.xy.annotation.LogOperate;
  8. import com.xy.collections.list.JArrayList;
  9. import com.xy.collections.list.JList;
  10. import com.xy.collections.map.JHashMap;
  11. import com.xy.collections.map.JMap;
  12. import com.xy.dto.*;
  13. import com.xy.entity.DeviceInfo;
  14. import com.xy.entity.MqttCommand;
  15. import com.xy.mapper.MqttCommandMapper;
  16. import com.xy.service.factory.cmd.CmdService;
  17. import com.xy.service.factory.device.DeviceFactory;
  18. import com.xy.utils.*;
  19. import com.xy.utils.enums.MqttCommandStatusEnum;
  20. import io.swagger.annotations.Api;
  21. import io.swagger.annotations.ApiOperation;
  22. import lombok.RequiredArgsConstructor;
  23. import lombok.extern.slf4j.Slf4j;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.util.StringUtils;
  26. import org.springframework.validation.annotation.Validated;
  27. import org.springframework.web.bind.annotation.PostMapping;
  28. import org.springframework.web.bind.annotation.RequestBody;
  29. import java.util.ArrayList;
  30. import java.util.List;
  31. import java.util.Map;
  32. import static com.xy.utils.PlusBeans.*;
  33. @Slf4j
  34. @Service
  35. @Api(tags = "mqtt接口")
  36. @RequiredArgsConstructor
  37. public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand> implements MqttService {
  38. private final DeviceMqttSendService deviceMqttSendService;
  39. private final RedisService<String> redisService;
  40. private final DeviceInfoServiceImpl deviceInfoService;
  41. private String keyStr = "mqtt:cmd:result:%d";
  42. public long getSn() {
  43. return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
  44. }
  45. public long getWkSn() {
  46. return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
  47. }
  48. public List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttCmdDto> mqttDtos) {
  49. String time = DataTime.getSring();
  50. List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
  51. mqttDtos.forEach(mqttDto -> {
  52. //给予默认值
  53. PaterDto pater = mqttDto.getPater()
  54. .setSn(getSn())
  55. .setTime(time)
  56. .setDeviceId(mqttDto.getDeviceId())
  57. .setDebug(mqttDto.getDebug());
  58. //延迟发布处理
  59. String delayTime = mqttDto.getDelayTime();
  60. String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
  61. if (!StringUtils.isEmpty(delayTime)) {
  62. //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
  63. long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
  64. topic = String.format("$delayed/%d/%s", m, topic);
  65. mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
  66. }
  67. //消息过期时间
  68. pater.setTimeout(mqttDto.getTimeout());
  69. JSONObject value = JSONUtil.parseObj(pater);
  70. Long wkSn = pater.getWkSn();
  71. try {
  72. //指令记录
  73. saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
  74. //发送消息
  75. deviceMqttSendService.cmd(new MqttDto.CmdParams()
  76. .setQos(mqttDto.getLevel())
  77. .setData(value.toString())
  78. .setTopic(topic)
  79. );
  80. list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
  81. } catch (Exception e) {
  82. //指令记录
  83. saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
  84. list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false));
  85. }
  86. });
  87. return list;
  88. }
  89. @Override
  90. @LogOperate
  91. @ApiOperation("指令发送")
  92. public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
  93. JList<CommandMqtt> commandMqttsList = new JArrayList<>(commandMqtts);
  94. List<DeviceInfo> deviceInfos = deviceInfoService.list(new LambdaQueryWrapper<DeviceInfo>().in(DeviceInfo::getDeviceId, commandMqttsList.getProperty(CommandMqtt::getDeviceId).comparing()));
  95. JMap<Long, DeviceInfo> deviceInfosJMaps = new JArrayList<>(deviceInfos).toMap(DeviceInfo::getDeviceId).cover();
  96. JMap<Integer, JList<CommandMqtt>> jMap = new JHashMap<>();
  97. commandMqtts.forEach(commandMqtt -> {
  98. DeviceInfo deviceInfo = deviceInfosJMaps.get(commandMqtt.getDeviceId());
  99. if (deviceInfo == null) {
  100. return;
  101. }
  102. if (!jMap.containsKey(deviceInfo.getDeviceType())) {
  103. jMap.put(deviceInfo.getDeviceType(), new JArrayList<>());
  104. }
  105. jMap.get(deviceInfo.getDeviceType()).set(commandMqtt);
  106. });
  107. List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>();
  108. jMap.forEach((deviceType, commandMqttss) -> {
  109. List<Tuple.Tuple3<Long, Long, Boolean>> data = FactoryUtils.getServiceRoute(DeviceFactory.class, deviceType)
  110. .senCommand(commandMqttss).getData();
  111. if (data != null) {
  112. list.addAll(data);
  113. }
  114. });
  115. JList<JList<CommandMqtt>> values = jMap.getValues();
  116. values.forEach(value -> value.forEach(commandMqtt -> {
  117. JSONObject templet = commandMqtt.getTemplet();
  118. templet.set("search", String.format("%s_%s", commandMqtt.getDeviceId(), templet.getJSONObject("data").getStr("type")));
  119. commandMqtt.setTemplet(templet);
  120. }));
  121. return R.ok(list).setLogMsg(JSONUtil.parseArray(values).toString());
  122. }
  123. @ApiOperation("指令Ack")
  124. @PostMapping("commandAck")
  125. public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) {
  126. MqttCommand mqttCommand = getById(ackMqttDto.getSn());
  127. if (!Emptys.check(mqttCommand)) {
  128. return R.fail("数据不存在");
  129. }
  130. //写入mysql
  131. mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey()))
  132. .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime()))
  133. .setBackServerTime(DataTime.getSring());
  134. updateById(mqttCommand);
  135. //写入redis
  136. String result = ackMqttDto.getResult().toString();
  137. if (Emptys.check(ackMqttDto.getResult())) {
  138. String key = String.format(keyStr, ackMqttDto.getSn());
  139. String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime());
  140. Map<String, String> map = redisService.getMap(key);
  141. if (Emptys.check(map)) {
  142. String time = map.get("time");
  143. if (DataTime.stringContrast(time, backClientTime) >= 0) {
  144. return R.ok();
  145. }
  146. }
  147. JMap<String, String> jmap = new JHashMap<String, String>().set("time", backClientTime).set("result", result);
  148. redisService.setMap(key, jmap);
  149. redisService.timeout(key, 300);
  150. }
  151. return R.ok();
  152. }
  153. @ApiOperation("指令结果通知")
  154. @PostMapping("commandResultBack")
  155. public R commandResultBack(@RequestBody @Validated MqttCmdDto.ResultBack resultBack) {
  156. return FactoryUtils.getService(CmdService.class, resultBack.getCmdType()).resultBack(resultBack);
  157. }
  158. @PostMapping("snByCmdAndResult")
  159. @ApiOperation("sn查询指令和结果数据")
  160. public R<MqttCmdDto.Vo3> snByCmdAndResult(@RequestBody MqttCmdDto.SnByCmdAndResult snByCmdAndResult) {
  161. MqttCommand mqttCommand = getById(snByCmdAndResult.getSn());
  162. String key = String.format(keyStr, snByCmdAndResult.getSn());
  163. Map<String, String> map = redisService.getMap(key);
  164. MqttCmdDto.Vo3 vo3 = copy(MqttCmdDto.Vo3.class, mqttCommand);
  165. if (Emptys.check(map)) {
  166. String result = map.get("result");
  167. vo3.setResult(JSONUtil.parseObj(result));
  168. }
  169. return R.ok(vo3);
  170. }
  171. @PostMapping("page")
  172. @ApiOperation("分页查询")
  173. public R<PageBean<MqttCmdDto.Vo2>> page(@RequestBody MqttCmdDto.Page page) {
  174. LambdaQueryWrapper<MqttCommand> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class)
  175. .like(MqttCommand::getError)
  176. .like(MqttCommand::getValue)
  177. .ge(MqttCommand::getSendTime, page.getBeginSendTime())
  178. .le(MqttCommand::getSendTime, page.getEndSendTime())
  179. .ge(MqttCommand::getTimeout, page.getBeginTimeout())
  180. .le(MqttCommand::getTimeout, page.getEndTimeout())
  181. .ge(MqttCommand::getDelayTime, page.getBeginDelayTime())
  182. .le(MqttCommand::getDelayTime, page.getEndDelayTime())
  183. .build()
  184. .orderByDesc(MqttCommand::getSendTime);
  185. PageBean pageBean = page.getPage();
  186. IPage<MqttCommand> iPage = page(toIPage(pageBean), lambdaQueryWrapper);
  187. return R.ok(toPageBean(MqttCmdDto.Vo2.class, iPage));
  188. }
  189. /**
  190. * 新增指令记录
  191. *
  192. * @param mqttDto 参数对象
  193. * @param value 参数json对象
  194. * @param status 状态
  195. * @param error 错误信息
  196. */
  197. private void saveMqttCommand(MqttCmdDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
  198. PaterDto pater = mqttDto.getPater();
  199. MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
  200. .setSn(pater.getSn())
  201. .setWkSn(pater.getWkSn())
  202. .setTopic(topic)
  203. .setStatus(Integer.parseInt(status.getKey()))
  204. .setValue(value.toString())
  205. .setSendTime(pater.getTime())
  206. .setNum(1)
  207. .setError(error);
  208. save(mqttCommand);
  209. }
  210. }