MqttServiceImpl.java 12 KB


  1. package com.xy.service;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  6. import com.baomidou.mybatisplus.core.metadata.IPage;
  7. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  8. import com.xy.alipay.AliPay;
  9. import com.xy.annotate.Runners;
  10. import com.xy.annotate.Timer;
  11. import com.xy.collections.map.JHashMap;
  12. import com.xy.collections.map.JMap;
  13. import com.xy.consumer.cmd.CmdProducer;
  14. import com.xy.dto.AckMqttDto;
  15. import com.xy.dto.CommandMqtt;
  16. import com.xy.dto.MqttDto;
  17. import com.xy.dto.PaterDto;
  18. import com.xy.entity.MqttCommand;
  19. import com.xy.error.CommRuntimeException;
  20. import com.xy.mapper.MqttCommandMapper;
  21. import com.xy.utils.*;
  22. import com.xy.utils.consts.CommConsts;
  23. import com.xy.utils.enums.CmdTypeEnum;
  24. import com.xy.utils.enums.MqttCommandStatusEnum;
  25. import io.swagger.annotations.Api;
  26. import io.swagger.annotations.ApiOperation;
  27. import lombok.RequiredArgsConstructor;
  28. import lombok.extern.slf4j.Slf4j;
  29. import org.springframework.stereotype.Service;
  30. import org.springframework.util.StringUtils;
  31. import org.springframework.validation.annotation.Validated;
  32. import org.springframework.web.bind.annotation.PostMapping;
  33. import org.springframework.web.bind.annotation.RequestBody;
  34. import java.util.ArrayList;
  35. import java.util.HashMap;
  36. import java.util.List;
  37. import java.util.Map;
  38. import static com.xy.utils.PlusBeans.*;
  39. @Slf4j
  40. @Service
  41. @Api(tags = "mqtt接口")
  42. @RequiredArgsConstructor
  43. public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand> implements MqttService {
  44. private final CmdProducer cmdProducer;
  45. private final RedisService<String> redisService;
  46. private String keyStr = "mqtt:cmd:result:%d";
  47. private boolean snInit;
  48. private long getSn() {
  49. if (!snInit) {
  50. throw new CommRuntimeException("sn号未初始化");
  51. }
  52. return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
  53. }
  54. private long getWkSn() {
  55. if (!snInit) {
  56. throw new CommRuntimeException("sn号未初始化");
  57. }
  58. return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
  59. }
  60. private List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttDto> mqttDtos) {
  61. String time = DataTime.getSring();
  62. List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
  63. mqttDtos.forEach(mqttDto -> {
  64. //给予默认值
  65. PaterDto pater = mqttDto.getPater()
  66. .setSn(getSn())
  67. .setTime(time)
  68. .setDeviceId(mqttDto.getDeviceId())
  69. .setDebug(mqttDto.getDebug());
  70. //延迟发布处理
  71. String delayTime = mqttDto.getDelayTime();
  72. String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
  73. if (!StringUtils.isEmpty(delayTime)) {
  74. //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
  75. long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
  76. topic = String.format("$delayed/%d/%s", m, topic);
  77. mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
  78. }
  79. //消息过期时间
  80. pater.setTimeout(mqttDto.getTimeout());
  81. JSONObject value = JSONUtil.parseObj(pater);
  82. Long wkSn = pater.getWkSn();
  83. try {
  84. //指令记录
  85. saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
  86. //发送消息
  87. cmdProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel());
  88. list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
  89. } catch (Exception e) {
  90. //指令记录
  91. saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
  92. list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false));
  93. }
  94. });
  95. return list;
  96. }
  97. @Timer
  98. @Runners
  99. @PostMapping("sysnAtoms")
  100. @ApiOperation("同步sn号redis原子变量")
  101. public void sysnAtoms() {
  102. //todo 集群有问题,暂不处理
  103. String sn = LambdaUtils.getProperty(MqttCommand::getSn);
  104. QueryWrapper<MqttCommand> wrapper = new QueryWrapper<MqttCommand>()
  105. .select(String.format("max(%s) as %s", sn, sn));
  106. MqttCommand mqttCommand = getOne(wrapper);
  107. if (mqttCommand != null) {
  108. redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1);
  109. MqttCommand byId = getById(mqttCommand.getSn());
  110. redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, byId.getWkSn() + 1);
  111. }
  112. snInit = true;
  113. }
  114. @Override
  115. @ApiOperation("指令发送")
  116. @AliPay(AliPay.Type.RESTART)
  117. public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
  118. //发送指令
  119. List<MqttDto> mqttDtos = new ArrayList<>();
  120. commandMqtts.forEach(commandMqtt -> {
  121. JSONObject templetObj = commandMqtt.getTemplet();
  122. String debug = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDebug));
  123. String level = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getLevel));
  124. String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDelayTime));
  125. String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getTimeout));
  126. String wkSn = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getWkSn));
  127. String actionType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getActionType));
  128. String cmdType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getCmdType));
  129. String ack = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getAck));
  130. JSONObject data = templetObj.getJSONObject(LambdaUtils.getProperty(PaterDto::getData));
  131. PaterDto paterDto = new PaterDto()
  132. .setActionType(actionType)
  133. .setCmdType(cmdType)
  134. .setAck(Emptys.check(ack) && !ack.equals("$ack$") ? Boolean.valueOf(ack) : Emptys.check(ack) && ack.equals("$ack$") ? true : true)
  135. .setWkSn(!Emptys.check(wkSn) ? getWkSn() : "$wkSn$".equals(wkSn) ? getWkSn() : Long.valueOf(wkSn))
  136. .setData(data);
  137. MqttDto mqttDto = new MqttDto()
  138. .setDeviceId(commandMqtt.getDeviceId())
  139. .setDebug(Emptys.check(debug) && !debug.equals("$debug$") ? Boolean.valueOf(debug) : Emptys.check(debug) && debug.equals("$debug$") ? false : false)
  140. .setLevel(Emptys.check(level) && !level.equals("$level$") ? Integer.valueOf(level) : Emptys.check(level) && level.equals("$level$") ? 1 : 1)
  141. .setDelayTime(Emptys.check(delayTime) && !delayTime.equals("$delayTime$") ? delayTime : Emptys.check(delayTime) && delayTime.equals("$delayTime$") ? null : null)
  142. .setTimeout(Emptys.check(timeout) && !timeout.equals("$timeout$") ? Integer.valueOf(timeout) : Emptys.check(timeout) && timeout.equals("$timeout$") ? 60 : 60)
  143. .setPater(paterDto);
  144. mqttDtos.add(mqttDto);
  145. });
  146. List<Tuple.Tuple3<Long, Long, Boolean>> list = sendMqtt(mqttDtos);
  147. //执行业务
  148. Map<String, List<MqttDto>> map = new HashMap<>();
  149. mqttDtos.forEach(mqttDto -> {
  150. String cmdType = mqttDto.getPater().getCmdType();
  151. if (!map.containsKey(cmdType)) {
  152. map.put(cmdType, new ArrayList<>());
  153. }
  154. map.get(cmdType).add(mqttDto);
  155. });
  156. map.forEach((k, v) -> {
  157. CmdTypeEnum anEnum = CmdTypeEnum.getEnum(k);
  158. if (anEnum == null) {
  159. return;
  160. }
  161. SpringBeanUtils.getBean(anEnum.getCmdServiceClass()).senCommand(v);
  162. });
  163. return R.ok(list);
  164. }
  165. @ApiOperation("指令Ack")
  166. @PostMapping("commandAck")
  167. public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) {
  168. MqttCommand mqttCommand = getById(ackMqttDto.getSn());
  169. if (!Emptys.check(mqttCommand)) {
  170. return R.fail("数据不存在");
  171. }
  172. //写入mysql
  173. mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey()))
  174. .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime()))
  175. .setBackServerTime(DataTime.getSring());
  176. updateById(mqttCommand);
  177. //写入redis
  178. String result = ackMqttDto.getResult().toString();
  179. if (Emptys.check(ackMqttDto.getResult())) {
  180. String key = String.format(keyStr, ackMqttDto.getSn());
  181. String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime());
  182. Map<String, String> map = redisService.getMap(key);
  183. if (Emptys.check(map)) {
  184. String time = map.get("time");
  185. if (DataTime.stringContrast(time, backClientTime) < 0) {
  186. return R.ok();
  187. }
  188. }
  189. JMap<String, String> jmap = new JHashMap<String, String>().set("time", backClientTime).set("result", result);
  190. redisService.setMap(key, jmap);
  191. redisService.timeout(key, 300);
  192. }
  193. return R.ok();
  194. }
  195. @ApiOperation("指令结果通知")
  196. @PostMapping("commandResultBack")
  197. public R commandResultBack(@RequestBody @Validated MqttDto.ResultBack resultBack) {
  198. return SpringBeanUtils.getBean(CmdTypeEnum.getEnum(resultBack.getCmdType()).getCmdServiceClass()).resultBack(resultBack);
  199. }
  200. @PostMapping("snByCmdAndResult")
  201. @ApiOperation("sn查询指令和结果数据")
  202. public R<MqttDto.Vo3> snByCmdAndResult(@RequestBody MqttDto.SnByCmdAndResult snByCmdAndResult) {
  203. MqttCommand mqttCommand = getById(snByCmdAndResult.getSn());
  204. String key = String.format(keyStr, snByCmdAndResult.getSn());
  205. Map<String, String> map = redisService.getMap(key);
  206. MqttDto.Vo3 vo3 = copy(MqttDto.Vo3.class, mqttCommand);
  207. if (Emptys.check(map)) {
  208. String result = map.get("result");
  209. vo3.setResult(JSONUtil.parseObj(result));
  210. }
  211. return R.ok(vo3);
  212. }
  213. @PostMapping("page")
  214. @ApiOperation("分页查询")
  215. public R<PageBean<MqttDto.Vo2>> page(@RequestBody MqttDto.Page page) {
  216. LambdaQueryWrapper<MqttCommand> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class)
  217. .like(MqttCommand::getError)
  218. .like(MqttCommand::getValue)
  219. .ge(MqttCommand::getSendTime, page.getBeginSendTime())
  220. .le(MqttCommand::getSendTime, page.getEndSendTime())
  221. .ge(MqttCommand::getTimeout, page.getBeginTimeout())
  222. .le(MqttCommand::getTimeout, page.getEndTimeout())
  223. .ge(MqttCommand::getDelayTime, page.getBeginDelayTime())
  224. .le(MqttCommand::getDelayTime, page.getEndDelayTime())
  225. .build()
  226. .orderByDesc(MqttCommand::getSendTime);
  227. PageBean pageBean = page.getPage();
  228. IPage<MqttCommand> iPage = page(toIPage(pageBean), lambdaQueryWrapper);
  229. return R.ok(toPageBean(MqttDto.Vo2.class, iPage));
  230. }
  231. /**
  232. * 新增指令记录
  233. *
  234. * @param mqttDto 参数对象
  235. * @param value 参数json对象
  236. * @param status 状态
  237. * @param error 错误信息
  238. */
  239. private void saveMqttCommand(MqttDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
  240. PaterDto pater = mqttDto.getPater();
  241. MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
  242. .setSn(pater.getSn())
  243. .setWkSn(pater.getWkSn())
  244. .setTopic(topic)
  245. .setStatus(Integer.parseInt(status.getKey()))
  246. .setValue(value.toString())
  247. .setSendTime(pater.getTime())
  248. .setNum(1)
  249. .setError(error);
  250. save(mqttCommand);
  251. }
  252. }