|
@@ -1,6 +1,7 @@
|
|
package com.xy.service.impl;
|
|
package com.xy.service.impl;
|
|
|
|
|
|
import cn.hutool.json.JSONObject;
|
|
import cn.hutool.json.JSONObject;
|
|
|
|
+import cn.hutool.json.JSONUtil;
|
|
import com.github.yitter.idgen.YitIdHelper;
|
|
import com.github.yitter.idgen.YitIdHelper;
|
|
import com.xy.dto.BackMqttDto;
|
|
import com.xy.dto.BackMqttDto;
|
|
import com.xy.dto.MqttDto;
|
|
import com.xy.dto.MqttDto;
|
|
@@ -10,16 +11,18 @@ import com.xy.mapper.MqttCommandMapper;
|
|
import com.xy.producer.MqttProducer;
|
|
import com.xy.producer.MqttProducer;
|
|
import com.xy.service.MqttService;
|
|
import com.xy.service.MqttService;
|
|
import com.xy.utils.*;
|
|
import com.xy.utils.*;
|
|
-import com.xy.utils.enums.MqttEnum;
|
|
|
|
|
|
+import com.xy.utils.enums.MqttCommandStatusEnum;
|
|
import io.swagger.annotations.Api;
|
|
import io.swagger.annotations.Api;
|
|
import io.swagger.annotations.ApiOperation;
|
|
import io.swagger.annotations.ApiOperation;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.AllArgsConstructor;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.util.StringUtils;
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
|
|
+@Slf4j
|
|
@Service
|
|
@Service
|
|
@Api(tags = "mqtt接口")
|
|
@Api(tags = "mqtt接口")
|
|
@AllArgsConstructor
|
|
@AllArgsConstructor
|
|
@@ -35,33 +38,35 @@ public class MqttServiceImpl implements MqttService {
|
|
List<MqttDto.Vo> list = new ArrayList<>(mqttDtos.size());
|
|
List<MqttDto.Vo> list = new ArrayList<>(mqttDtos.size());
|
|
mqttDtos.forEach(mqttDto -> {
|
|
mqttDtos.forEach(mqttDto -> {
|
|
//给予默认值
|
|
//给予默认值
|
|
- JSONObject command = mqttDto.getCommand();
|
|
|
|
- command.set(LambdaUtils.getProperty(PaterDto::getSn), YitIdHelper.nextId())
|
|
|
|
- .set(LambdaUtils.getProperty(PaterDto::getTime), DataTime.getSring());
|
|
|
|
|
|
+ PaterDto pater = mqttDto.getPater()
|
|
|
|
+ .setSn(String.valueOf(YitIdHelper.nextId()))
|
|
|
|
+ .setTime(DataTime.getSring());
|
|
//延迟发布处理
|
|
//延迟发布处理
|
|
String delayTime = mqttDto.getDelayTime();
|
|
String delayTime = mqttDto.getDelayTime();
|
|
String topic = mqttDto.getTopic();
|
|
String topic = mqttDto.getTopic();
|
|
if (!StringUtils.isEmpty(delayTime)) {
|
|
if (!StringUtils.isEmpty(delayTime)) {
|
|
//消息消费时间 - 发布时间的差值 = 距今被消费的秒数
|
|
//消息消费时间 - 发布时间的差值 = 距今被消费的秒数
|
|
- long m = DataTime.diff(DataTime.toLocal(command.getStr(LambdaUtils.getProperty(PaterDto::getTime))), DataTime.toLocal(delayTime), "s");
|
|
|
|
|
|
+ long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
|
|
topic = String.format("$delayed/%d/%s", m, mqttDto.getTopic());
|
|
topic = String.format("$delayed/%d/%s", m, mqttDto.getTopic());
|
|
mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
|
|
mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
|
|
}
|
|
}
|
|
//处理消息过期时间
|
|
//处理消息过期时间
|
|
- command.set(LambdaUtils.getProperty(PaterDto::getTimeout), DataTime.getStringAround(0, 0, 0, 0, 0, mqttDto.getTimeout()));
|
|
|
|
- //写入新的指令json
|
|
|
|
- mqttDto.setCommand(command);
|
|
|
|
- String workSn = command.getStr(LambdaUtils.getProperty(PaterDto::getWorkSn));
|
|
|
|
|
|
+ pater.setTimeout(DataTime.getStringAround(0, 0, 0, 0, 0, mqttDto.getTimeout()));
|
|
|
|
+ //翻译枚举
|
|
|
|
+ JSONObject value = JSONUtil.parseObj(pater)
|
|
|
|
+ .set(LambdaUtils.getProperty(PaterDto::getType1), pater.getType1().getKey())
|
|
|
|
+ .set(LambdaUtils.getProperty(PaterDto::getType2), pater.getType2().getKey());
|
|
|
|
+ String wkSn = pater.getWkSn();
|
|
try {
|
|
try {
|
|
//发送消息
|
|
//发送消息
|
|
- mqttProducer.sendToMqtt(command.toString(), topic, mqttDto.getLevel());
|
|
|
|
|
|
+ mqttProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel());
|
|
//指令记录
|
|
//指令记录
|
|
- saveMqttCommand(mqttDto, command.getBool(LambdaUtils.getProperty(PaterDto::getAck)) ? MqttEnum.COMMAND_SEND.getKey() : MqttEnum.COMMAND_SUCCESS.getKey(), null);
|
|
|
|
- list.add(new MqttDto.Vo().setWorkSn(workSn).setResult(true));
|
|
|
|
|
|
+ saveMqttCommand(mqttDto, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND.getKey() : MqttCommandStatusEnum.COMMAND_SUCCESS.getKey(), null);
|
|
|
|
+ list.add(new MqttDto.Vo().setWorkSn(wkSn).setResult(true));
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
//指令记录
|
|
//指令记录
|
|
- saveMqttCommand(mqttDto, MqttEnum.COMMAND_SEND_FAIL.getKey(), e.getMessage());
|
|
|
|
- list.add(new MqttDto.Vo().setWorkSn(workSn).setResult(false));
|
|
|
|
|
|
+ saveMqttCommand(mqttDto, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL.getKey(), e.getMessage());
|
|
|
|
+ list.add(new MqttDto.Vo().setWorkSn(wkSn).setResult(false));
|
|
}
|
|
}
|
|
});
|
|
});
|
|
return list;
|
|
return list;
|
|
@@ -80,7 +85,7 @@ public class MqttServiceImpl implements MqttService {
|
|
return R.ok();
|
|
return R.ok();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- mqttCommand.setStatus(Integer.parseInt(backMqttDto.getBool() ? MqttEnum.COMMAND_SUCCESS.getKey() : MqttEnum.COMMAND_EXC_FAIL.getKey()))
|
|
|
|
|
|
+ mqttCommand.setStatus(Integer.parseInt(backMqttDto.getBool() ? MqttCommandStatusEnum.COMMAND_SUCCESS.getKey() : MqttCommandStatusEnum.COMMAND_EXC_FAIL.getKey()))
|
|
.setExecTime(DataTime.toString(backMqttDto.getExecTime()));
|
|
.setExecTime(DataTime.toString(backMqttDto.getExecTime()));
|
|
mqttCommandMapper.updateById(mqttCommand);
|
|
mqttCommandMapper.updateById(mqttCommand);
|
|
return R.ok();
|
|
return R.ok();
|
|
@@ -90,17 +95,20 @@ public class MqttServiceImpl implements MqttService {
|
|
* 新增指令记录
|
|
* 新增指令记录
|
|
*
|
|
*
|
|
* @param mqttDto 参数对象
|
|
* @param mqttDto 参数对象
|
|
|
|
+ * @param value 参数json对象
|
|
* @param status 状态
|
|
* @param status 状态
|
|
* @param error 错误信息
|
|
* @param error 错误信息
|
|
*/
|
|
*/
|
|
- private void saveMqttCommand(MqttDto mqttDto, String status, String error) {
|
|
|
|
- PaterDto command = mqttDto.getCommand().toBean(PaterDto.class);
|
|
|
|
- MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), command)
|
|
|
|
- .setId(command.getSn())
|
|
|
|
|
|
+ private void saveMqttCommand(MqttDto mqttDto, JSONObject value, String status, String error) {
|
|
|
|
+ PaterDto pater = mqttDto.getPater();
|
|
|
|
+ MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
|
|
|
|
+ .setId(pater.getSn())
|
|
|
|
+ .setType1(pater.getType1().getKey())
|
|
|
|
+ .setType2(pater.getType2().getKey())
|
|
.setStatus(Integer.parseInt(status))
|
|
.setStatus(Integer.parseInt(status))
|
|
- .setValue(mqttDto.getCommand().toString())
|
|
|
|
- .setSendTime(command.getTime())
|
|
|
|
- .setExecTime(status.equals(MqttEnum.COMMAND_SUCCESS.getKey()) ? DataTime.getSring() : null)
|
|
|
|
|
|
+ .setValue(value.toString())
|
|
|
|
+ .setSendTime(pater.getTime())
|
|
|
|
+ .setExecTime(status.equals(MqttCommandStatusEnum.COMMAND_SUCCESS.getKey()) ? DataTime.getSring() : null)
|
|
.setNum(1)
|
|
.setNum(1)
|
|
.setError(error);
|
|
.setError(error);
|
|
mqttCommandMapper.insert(mqttCommand);
|
|
mqttCommandMapper.insert(mqttCommand);
|