|
@@ -0,0 +1,107 @@
|
|
|
+package com.xy.service;
|
|
|
+
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
+import com.github.yitter.idgen.YitIdHelper;
|
|
|
+import com.xy.dto.BackMqttDto;
|
|
|
+import com.xy.dto.MqttDto;
|
|
|
+import com.xy.dto.pater.PaterDto;
|
|
|
+import com.xy.entity.MqttCommand;
|
|
|
+import com.xy.mapper.MqttCommandMapper;
|
|
|
+import com.xy.producer.MqttProducer;
|
|
|
+import com.xy.utils.*;
|
|
|
+import com.xy.utils.enums.MqttEnum;
|
|
|
+import io.swagger.annotations.Api;
|
|
|
+import io.swagger.annotations.ApiOperation;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+@Service
|
|
|
+@Api(tags = "mqtt接口")
|
|
|
+@AllArgsConstructor
|
|
|
+public class MqttServiceImpl implements MqttService {
|
|
|
+
|
|
|
+ private MqttProducer mqttProducer;
|
|
|
+
|
|
|
+ private MqttCommandMapper mqttCommandMapper;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @ApiOperation("指令发布")
|
|
|
+ public List<MqttDto.Vo> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
+ List<MqttDto.Vo> list = new ArrayList<>(mqttDtos.size());
|
|
|
+ mqttDtos.forEach(mqttDto -> {
|
|
|
+ //给予默认值
|
|
|
+ JSONObject command = mqttDto.getCommand();
|
|
|
+ command.set(LambdaUtils.getProperty(PaterDto::getSn), YitIdHelper.nextId())
|
|
|
+ .set(LambdaUtils.getProperty(PaterDto::getTime), DataTime.getSring());
|
|
|
+ //延迟发布处理
|
|
|
+ String delayTime = mqttDto.getDelayTime();
|
|
|
+ String topic = mqttDto.getTopic();
|
|
|
+ if (!StringUtils.isEmpty(delayTime)) {
|
|
|
+ //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
|
|
|
+ long m = DataTime.diff(DataTime.toLocal(command.getStr(LambdaUtils.getProperty(PaterDto::getTime))), DataTime.toLocal(delayTime), "s");
|
|
|
+ topic = String.format("$delayed/%d/%s", m, mqttDto.getTopic());
|
|
|
+ mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
|
|
|
+ }
|
|
|
+ //处理消息过期时间
|
|
|
+ command.set(LambdaUtils.getProperty(PaterDto::getTimeout), DataTime.getStringAround(0, 0, 0, 0, 0, mqttDto.getTimeout()));
|
|
|
+ //写入新的指令json
|
|
|
+ mqttDto.setCommand(command);
|
|
|
+ String workSn = command.getStr(LambdaUtils.getProperty(PaterDto::getWorkSn));
|
|
|
+ try {
|
|
|
+ //发送消息
|
|
|
+ mqttProducer.sendToMqtt(command.toString(), topic, mqttDto.getLevel());
|
|
|
+ //指令记录
|
|
|
+ saveMqttCommand(mqttDto, command.getBool(LambdaUtils.getProperty(PaterDto::getAck)) ? MqttEnum.COMMAND_SEND.getKey() : MqttEnum.COMMAND_SUCCESS.getKey(), null);
|
|
|
+ list.add(new MqttDto.Vo().setWorkSn(workSn).setResult(true));
|
|
|
+ } catch (Exception e) {
|
|
|
+ //指令记录
|
|
|
+ saveMqttCommand(mqttDto, MqttEnum.COMMAND_SEND_FAIL.getKey(), e.getMessage());
|
|
|
+ list.add(new MqttDto.Vo().setWorkSn(workSn).setResult(false));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @ApiOperation("指令结果回执")
|
|
|
+ public R mqttBack(BackMqttDto backMqttDto) {
|
|
|
+ MqttCommand mqttCommand = mqttCommandMapper.selectById(backMqttDto.getSn());
|
|
|
+ if (!Emptys.check(mqttCommand)) {
|
|
|
+ return R.fail("数据不存在");
|
|
|
+ }
|
|
|
+ //重复上报,前面的时间不能覆盖后面的时间
|
|
|
+ if (!StringUtils.isEmpty(mqttCommand.getExecTime())) {
|
|
|
+ if (DataTime.stringContrast(DataTime.toString(backMqttDto.getExecTime()), mqttCommand.getExecTime()) < 0) {
|
|
|
+ return R.ok();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ mqttCommand.setStatus(Integer.parseInt(backMqttDto.getBool() ? MqttEnum.COMMAND_SUCCESS.getKey() : MqttEnum.COMMAND_EXC_FAIL.getKey()))
|
|
|
+ .setExecTime(DataTime.toString(backMqttDto.getExecTime()));
|
|
|
+ mqttCommandMapper.updateById(mqttCommand);
|
|
|
+ return R.ok();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 新增指令记录
|
|
|
+ *
|
|
|
+ * @param mqttDto 参数对象
|
|
|
+ * @param status 状态
|
|
|
+ * @param error 错误信息
|
|
|
+ */
|
|
|
+ private void saveMqttCommand(MqttDto mqttDto, String status, String error) {
|
|
|
+ PaterDto command = mqttDto.getCommand().toBean(PaterDto.class);
|
|
|
+ MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), command)
|
|
|
+ .setId(command.getSn())
|
|
|
+ .setStatus(Integer.parseInt(status))
|
|
|
+ .setValue(mqttDto.getCommand().toString())
|
|
|
+ .setSendTime(command.getTime())
|
|
|
+ .setExecTime(status.equals(MqttEnum.COMMAND_SUCCESS.getKey()) ? DataTime.getSring() : null)
|
|
|
+ .setNum(1)
|
|
|
+ .setError(error);
|
|
|
+ mqttCommandMapper.insert(mqttCommand);
|
|
|
+ }
|
|
|
+}
|