|
@@ -15,6 +15,7 @@ import com.xy.dto.CommandMqtt;
|
|
import com.xy.dto.MqttDto;
|
|
import com.xy.dto.MqttDto;
|
|
import com.xy.dto.PaterDto;
|
|
import com.xy.dto.PaterDto;
|
|
import com.xy.entity.MqttCommand;
|
|
import com.xy.entity.MqttCommand;
|
|
|
|
+import com.xy.error.CommRuntimeException;
|
|
import com.xy.mapper.MqttCommandMapper;
|
|
import com.xy.mapper.MqttCommandMapper;
|
|
import com.xy.producer.CommProducer;
|
|
import com.xy.producer.CommProducer;
|
|
import com.xy.utils.*;
|
|
import com.xy.utils.*;
|
|
@@ -50,11 +51,19 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
|
|
|
|
private String keyStr = "mqtt:cmd:result:%d";
|
|
private String keyStr = "mqtt:cmd:result:%d";
|
|
|
|
|
|
|
|
+ private boolean snInit;
|
|
|
|
+
|
|
private long getSn() {
|
|
private long getSn() {
|
|
|
|
+ if (!snInit) {
|
|
|
|
+ throw new CommRuntimeException("sn号未初始化");
|
|
|
|
+ }
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
|
|
}
|
|
}
|
|
|
|
|
|
private long getWkSn() {
|
|
private long getWkSn() {
|
|
|
|
+ if (!snInit) {
|
|
|
|
+ throw new CommRuntimeException("sn号未初始化");
|
|
|
|
+ }
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -101,6 +110,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
@PostMapping("sysnAtoms")
|
|
@PostMapping("sysnAtoms")
|
|
@ApiOperation("同步sn号redis原子变量")
|
|
@ApiOperation("同步sn号redis原子变量")
|
|
public void sysnAtoms() {
|
|
public void sysnAtoms() {
|
|
|
|
+ //todo 集群有问题,暂不处理
|
|
String sn = LambdaUtils.getProperty(MqttCommand::getSn);
|
|
String sn = LambdaUtils.getProperty(MqttCommand::getSn);
|
|
QueryWrapper<MqttCommand> wrapper = new QueryWrapper<MqttCommand>()
|
|
QueryWrapper<MqttCommand> wrapper = new QueryWrapper<MqttCommand>()
|
|
.select(String.format("max(%s) as %s", sn, sn));
|
|
.select(String.format("max(%s) as %s", sn, sn));
|
|
@@ -109,7 +119,9 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1);
|
|
redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1);
|
|
- redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, JSONUtil.parseObj(mqttCommand.getValue()).getLong("wkSn") + 1);
|
|
|
|
|
|
+ MqttCommand byId = getById(mqttCommand.getSn());
|
|
|
|
+ redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, byId.getWkSn() + 1);
|
|
|
|
+ snInit = true;
|
|
}
|
|
}
|
|
|
|
|
|
@Timer
|
|
@Timer
|
|
@@ -247,6 +259,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
PaterDto pater = mqttDto.getPater();
|
|
PaterDto pater = mqttDto.getPater();
|
|
MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
|
|
MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
|
|
.setSn(pater.getSn())
|
|
.setSn(pater.getSn())
|
|
|
|
+ .setWkSn(pater.getWkSn())
|
|
.setStatus(Integer.parseInt(status.getKey()))
|
|
.setStatus(Integer.parseInt(status.getKey()))
|
|
.setValue(value.toString())
|
|
.setValue(value.toString())
|
|
.setSendTime(pater.getTime())
|
|
.setSendTime(pater.getTime())
|