|
@@ -6,7 +6,7 @@ import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.xy.annotate.Runners;
|
|
|
import com.xy.annotate.Timer;
|
|
|
-import com.xy.dto.BackMqttDto;
|
|
|
+import com.xy.dto.AckMqttDto;
|
|
|
import com.xy.dto.CommandMqtt;
|
|
|
import com.xy.dto.MqttDto;
|
|
|
import com.xy.dto.PaterDto;
|
|
@@ -18,31 +18,33 @@ import com.xy.utils.consts.CommConsts;
|
|
|
import com.xy.utils.enums.MqttCommandStatusEnum;
|
|
|
import io.swagger.annotations.Api;
|
|
|
import io.swagger.annotations.ApiOperation;
|
|
|
-import lombok.AllArgsConstructor;
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
+import org.springframework.validation.annotation.Validated;
|
|
|
import org.springframework.web.bind.annotation.PostMapping;
|
|
|
import org.springframework.web.bind.annotation.RequestBody;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
|
|
|
+import static com.xy.utils.Beans.copy;
|
|
|
import static com.xy.utils.EsBeans.toPageBean;
|
|
|
|
|
|
@Slf4j
|
|
|
@Service
|
|
|
@Api(tags = "mqtt接口")
|
|
|
-@AllArgsConstructor
|
|
|
+@RequiredArgsConstructor
|
|
|
public class MqttServiceImpl implements MqttService {
|
|
|
|
|
|
- private MqttProducer mqttProducer;
|
|
|
+ private final MqttProducer mqttProducer;
|
|
|
|
|
|
- private MqttCommandMapper mqttCommandMapper;
|
|
|
+ private final MqttCommandMapper mqttCommandMapper;
|
|
|
|
|
|
- private RedisService redisService;
|
|
|
+ private final RedisService<String> redisService;
|
|
|
+
|
|
|
+ private String keyStr = "mqtt:cmd:result:%d";
|
|
|
|
|
|
private long getSn() {
|
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
|
|
@@ -52,9 +54,9 @@ public class MqttServiceImpl implements MqttService {
|
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
|
|
|
}
|
|
|
|
|
|
- private Map<Long, Boolean> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
+ private List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
String time = DataTime.getSring();
|
|
|
- Map<Long, Boolean> map = new HashMap<>(mqttDtos.size());
|
|
|
+ List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
|
|
|
mqttDtos.forEach(mqttDto -> {
|
|
|
//给予默认值
|
|
|
PaterDto pater = mqttDto.getPater()
|
|
@@ -77,17 +79,17 @@ public class MqttServiceImpl implements MqttService {
|
|
|
Long wkSn = pater.getWkSn();
|
|
|
try {
|
|
|
//指令记录
|
|
|
- saveMqttCommand(mqttDto, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_SUCCESS, null);
|
|
|
+ saveMqttCommand(mqttDto, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
|
|
|
//发送消息
|
|
|
mqttProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel());
|
|
|
- map.put(wkSn, true);
|
|
|
+ list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
|
|
|
} catch (Exception e) {
|
|
|
//指令记录
|
|
|
saveMqttCommand(mqttDto, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
|
|
|
- map.put(wkSn, false);
|
|
|
+ list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false));
|
|
|
}
|
|
|
});
|
|
|
- return map;
|
|
|
+ return list;
|
|
|
}
|
|
|
|
|
|
@Timer
|
|
@@ -110,7 +112,7 @@ public class MqttServiceImpl implements MqttService {
|
|
|
@Timer
|
|
|
@Override
|
|
|
@ApiOperation("发送指令")
|
|
|
- public R<Map<Long, Boolean>> senCommand(List<CommandMqtt> commandMqtts) {
|
|
|
+ public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
|
|
|
List<MqttDto> mqttDtos = new ArrayList<>();
|
|
|
commandMqtts.forEach(commandMqtt -> {
|
|
|
JSONObject templetObj = commandMqtt.getTemplet();
|
|
@@ -138,29 +140,40 @@ public class MqttServiceImpl implements MqttService {
|
|
|
.setPater(paterDto);
|
|
|
mqttDtos.add(mqttDto);
|
|
|
});
|
|
|
- Map<Long, Boolean> map = sendMqtt(mqttDtos);
|
|
|
- return R.ok(map);
|
|
|
+ return R.ok(sendMqtt(mqttDtos));
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- @ApiOperation("指令结果通知")
|
|
|
- public R commandBack(BackMqttDto backMqttDto) {
|
|
|
- MqttCommand mqttCommand = mqttCommandMapper.selectById(backMqttDto.getSn());
|
|
|
+ @ApiOperation("指令Ack")
|
|
|
+ @PostMapping("commandAck")
|
|
|
+ public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) {
|
|
|
+ MqttCommand mqttCommand = mqttCommandMapper.selectById(ackMqttDto.getSn());
|
|
|
if (!Emptys.check(mqttCommand)) {
|
|
|
return R.fail("数据不存在");
|
|
|
}
|
|
|
- //重复上报,前面的时间不能覆盖后面的时间
|
|
|
- if (!StringUtils.isEmpty(mqttCommand.getExecTime())) {
|
|
|
- if (DataTime.stringContrast(DataTime.toString(backMqttDto.getExecTime()), mqttCommand.getExecTime()) < 0) {
|
|
|
- return R.ok();
|
|
|
- }
|
|
|
- }
|
|
|
- mqttCommand.setStatus(Integer.parseInt(backMqttDto.getBool() ? MqttCommandStatusEnum.COMMAND_SUCCESS.getKey() : MqttCommandStatusEnum.COMMAND_EXC_FAIL.getKey()))
|
|
|
- .setExecTime(DataTime.toString(backMqttDto.getExecTime()));
|
|
|
+ //写入es
|
|
|
+ mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey()))
|
|
|
+ .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime()))
|
|
|
+ .setBackServerTime(DataTime.getSring());
|
|
|
mqttCommandMapper.updateById(mqttCommand);
|
|
|
+ //写入redis
|
|
|
+ String result = ackMqttDto.getResult().toString();
|
|
|
+ if (Emptys.check(ackMqttDto.getResult())) {
|
|
|
+ String key = String.format(keyStr, ackMqttDto.getSn());
|
|
|
+ redisService.set(key, result);
|
|
|
+ }
|
|
|
return R.ok();
|
|
|
}
|
|
|
|
|
|
+ @PostMapping("snByCmdAndResult")
|
|
|
+ @ApiOperation("sn查询指令和结果数据")
|
|
|
+ public R<MqttDto.Vo3> snByCmdAndResult(@RequestBody MqttDto.SnByCmdAndResult snByCmdAndResult) {
|
|
|
+ MqttCommand mqttCommand = mqttCommandMapper.selectById(snByCmdAndResult.getSn());
|
|
|
+ String key = String.format(keyStr, snByCmdAndResult.getSn());
|
|
|
+ String result = redisService.get(key);
|
|
|
+ MqttDto.Vo3 vo3 = copy(MqttDto.Vo3.class, mqttCommand).setResult(JSONUtil.parseObj(result));
|
|
|
+ return R.ok(vo3);
|
|
|
+ }
|
|
|
+
|
|
|
@PostMapping("page")
|
|
|
@ApiOperation("分页查询")
|
|
|
public R<PageBean<MqttDto.Vo2>> page(@RequestBody MqttDto.Page page) {
|
|
@@ -196,7 +209,6 @@ public class MqttServiceImpl implements MqttService {
|
|
|
.setStatus(Integer.parseInt(status.getKey()))
|
|
|
.setValue(value.toString())
|
|
|
.setSendTime(pater.getTime())
|
|
|
- .setExecTime(status.getKey().equals(MqttCommandStatusEnum.COMMAND_SUCCESS.getKey()) ? DataTime.getSring() : null)
|
|
|
.setNum(1)
|
|
|
.setError(error);
|
|
|
mqttCommandMapper.insert(mqttCommand);
|