|
@@ -6,30 +6,28 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
-import com.xy.alipay.AliPay;
|
|
|
import com.xy.annotate.Runners;
|
|
|
import com.xy.annotate.Timer;
|
|
|
+import com.xy.collections.list.JArrayList;
|
|
|
+import com.xy.collections.list.JList;
|
|
|
import com.xy.collections.map.JHashMap;
|
|
|
import com.xy.collections.map.JMap;
|
|
|
import com.xy.consumer.cmd.CmdProducer;
|
|
|
+import com.xy.device.EnumDeviceType;
|
|
|
import com.xy.dto.AckMqttDto;
|
|
|
import com.xy.dto.CommandMqtt;
|
|
|
import com.xy.dto.MqttDto;
|
|
|
import com.xy.dto.PaterDto;
|
|
|
+import com.xy.entity.DeviceInfo;
|
|
|
import com.xy.entity.MqttCommand;
|
|
|
import com.xy.error.CommRuntimeException;
|
|
|
import com.xy.mapper.MqttCommandMapper;
|
|
|
-import com.xy.service.factory.CmdService;
|
|
|
-import com.xy.service.factory.QualityCmdServiceImpl;
|
|
|
-import com.xy.service.factory.SetsCmdServiceImpl;
|
|
|
-import com.xy.service.factory.TaskCmdServiceImpl;
|
|
|
+import com.xy.service.factory.cmd.service.CmdService;
|
|
|
+import com.xy.service.factory.device.service.DeviceFactory;
|
|
|
import com.xy.utils.*;
|
|
|
-import com.xy.utils.CommConsts;
|
|
|
import com.xy.utils.enums.MqttCommandStatusEnum;
|
|
|
import io.swagger.annotations.Api;
|
|
|
import io.swagger.annotations.ApiOperation;
|
|
|
-import lombok.AllArgsConstructor;
|
|
|
-import lombok.Getter;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -39,7 +37,6 @@ import org.springframework.web.bind.annotation.PostMapping;
|
|
|
import org.springframework.web.bind.annotation.RequestBody;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -55,25 +52,27 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
|
|
|
|
private final RedisService<String> redisService;
|
|
|
|
|
|
+ private final DeviceInfoServiceImpl deviceInfoService;
|
|
|
+
|
|
|
private String keyStr = "mqtt:cmd:result:%d";
|
|
|
|
|
|
private boolean snInit;
|
|
|
|
|
|
- private long getSn() {
|
|
|
+ public long getSn() {
|
|
|
if (!snInit) {
|
|
|
throw new CommRuntimeException("sn号未初始化");
|
|
|
}
|
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
|
|
|
}
|
|
|
|
|
|
- private long getWkSn() {
|
|
|
+ public long getWkSn() {
|
|
|
if (!snInit) {
|
|
|
throw new CommRuntimeException("sn号未初始化");
|
|
|
}
|
|
|
return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
|
|
|
}
|
|
|
|
|
|
- private List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
+ public List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
String time = DataTime.getSring();
|
|
|
List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
|
|
|
mqttDtos.forEach(mqttDto -> {
|
|
@@ -111,72 +110,30 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
|
return list;
|
|
|
}
|
|
|
|
|
|
- @Timer
|
|
|
- @Runners
|
|
|
- @PostMapping("sysnAtoms")
|
|
|
- @ApiOperation("同步sn号redis原子变量")
|
|
|
- public void sysnAtoms() {
|
|
|
- //todo 集群有问题,暂不处理
|
|
|
- String sn = LambdaUtils.getProperty(MqttCommand::getSn);
|
|
|
- QueryWrapper<MqttCommand> wrapper = new QueryWrapper<MqttCommand>()
|
|
|
- .select(String.format("max(%s) as %s", sn, sn));
|
|
|
- MqttCommand mqttCommand = getOne(wrapper);
|
|
|
- if (mqttCommand != null) {
|
|
|
- redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1);
|
|
|
- MqttCommand byId = getById(mqttCommand.getSn());
|
|
|
- redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, byId.getWkSn() + 1);
|
|
|
- }
|
|
|
- snInit = true;
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
@ApiOperation("指令发送")
|
|
|
- @AliPay(AliPay.Type.RESTART)
|
|
|
public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
|
|
|
- //发送指令
|
|
|
- List<MqttDto> mqttDtos = new ArrayList<>();
|
|
|
+ JList<CommandMqtt> commandMqttsList = new JArrayList<>(commandMqtts);
|
|
|
+ List<DeviceInfo> deviceInfos = deviceInfoService.list(new LambdaQueryWrapper<DeviceInfo>().in(DeviceInfo::getDeviceId, commandMqttsList.getProperty(CommandMqtt::getDeviceId).comparing()));
|
|
|
+ JMap<Long, DeviceInfo> deviceInfosJMaps = new JArrayList<>(deviceInfos).toMap(DeviceInfo::getDeviceId).cover();
|
|
|
+ JMap<Integer, JList<CommandMqtt>> jMap = new JHashMap<>();
|
|
|
commandMqtts.forEach(commandMqtt -> {
|
|
|
- JSONObject templetObj = commandMqtt.getTemplet();
|
|
|
- String debug = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDebug));
|
|
|
- String level = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getLevel));
|
|
|
- String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDelayTime));
|
|
|
- String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getTimeout));
|
|
|
- String wkSn = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getWkSn));
|
|
|
- String actionType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getActionType));
|
|
|
- String cmdType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getCmdType));
|
|
|
- String ack = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getAck));
|
|
|
- JSONObject data = templetObj.getJSONObject(LambdaUtils.getProperty(PaterDto::getData));
|
|
|
- PaterDto paterDto = new PaterDto()
|
|
|
- .setActionType(actionType)
|
|
|
- .setCmdType(cmdType)
|
|
|
- .setAck(Emptys.check(ack) && !ack.equals("$ack$") ? Boolean.valueOf(ack) : Emptys.check(ack) && ack.equals("$ack$") ? true : true)
|
|
|
- .setWkSn(!Emptys.check(wkSn) ? getWkSn() : "$wkSn$".equals(wkSn) ? getWkSn() : Long.valueOf(wkSn))
|
|
|
- .setData(data);
|
|
|
- MqttDto mqttDto = new MqttDto()
|
|
|
- .setDeviceId(commandMqtt.getDeviceId())
|
|
|
- .setDebug(Emptys.check(debug) && !debug.equals("$debug$") ? Boolean.valueOf(debug) : Emptys.check(debug) && debug.equals("$debug$") ? false : false)
|
|
|
- .setLevel(Emptys.check(level) && !level.equals("$level$") ? Integer.valueOf(level) : Emptys.check(level) && level.equals("$level$") ? 1 : 1)
|
|
|
- .setDelayTime(Emptys.check(delayTime) && !delayTime.equals("$delayTime$") ? delayTime : Emptys.check(delayTime) && delayTime.equals("$delayTime$") ? null : null)
|
|
|
- .setTimeout(Emptys.check(timeout) && !timeout.equals("$timeout$") ? Integer.valueOf(timeout) : Emptys.check(timeout) && timeout.equals("$timeout$") ? 60 : 60)
|
|
|
- .setPater(paterDto);
|
|
|
- mqttDtos.add(mqttDto);
|
|
|
- });
|
|
|
- List<Tuple.Tuple3<Long, Long, Boolean>> list = sendMqtt(mqttDtos);
|
|
|
- //执行业务
|
|
|
- Map<String, List<MqttDto>> map = new HashMap<>();
|
|
|
- mqttDtos.forEach(mqttDto -> {
|
|
|
- String cmdType = mqttDto.getPater().getCmdType();
|
|
|
- if (!map.containsKey(cmdType)) {
|
|
|
- map.put(cmdType, new ArrayList<>());
|
|
|
+ DeviceInfo deviceInfo = deviceInfosJMaps.get(commandMqtt.getDeviceId());
|
|
|
+ if (deviceInfo == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!jMap.containsKey(deviceInfo.getDeviceType())) {
|
|
|
+ jMap.put(deviceInfo.getDeviceType(), new JArrayList<>());
|
|
|
}
|
|
|
- map.get(cmdType).add(mqttDto);
|
|
|
+ jMap.get(deviceInfo.getDeviceType()).set(commandMqtt);
|
|
|
});
|
|
|
- map.forEach((k, v) -> {
|
|
|
- CmdTypeEnum anEnum = CmdTypeEnum.getEnum(k);
|
|
|
- if (anEnum == null) {
|
|
|
- return;
|
|
|
+ List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>();
|
|
|
+ jMap.forEach((deviceType, commandMqttss) -> {
|
|
|
+ List<Tuple.Tuple3<Long, Long, Boolean>> data = FactoryUtils.getService(() -> deviceType == EnumDeviceType.N_5.getIntCode() ? 2 : 1, DeviceFactory.class)
|
|
|
+ .senCommand(commandMqttss).getData();
|
|
|
+ if (data != null) {
|
|
|
+ list.addAll(data);
|
|
|
}
|
|
|
- SpringBeanUtils.getBean(anEnum.getCmdServiceClass()).senCommand(v);
|
|
|
});
|
|
|
return R.ok(list);
|
|
|
}
|
|
@@ -215,7 +172,7 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
|
@ApiOperation("指令结果通知")
|
|
|
@PostMapping("commandResultBack")
|
|
|
public R commandResultBack(@RequestBody @Validated MqttDto.ResultBack resultBack) {
|
|
|
- return SpringBeanUtils.getBean(CmdTypeEnum.getEnum(resultBack.getCmdType()).getCmdServiceClass()).resultBack(resultBack);
|
|
|
+ return FactoryUtils.getService(resultBack.getCmdType(), CmdService.class).resultBack(resultBack);
|
|
|
}
|
|
|
|
|
|
@PostMapping("snByCmdAndResult")
|
|
@@ -275,49 +232,21 @@ public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand>
|
|
|
}
|
|
|
|
|
|
|
|
|
- /**
|
|
|
- * 工厂枚举
|
|
|
- */
|
|
|
- @Getter
|
|
|
- @AllArgsConstructor
|
|
|
- public enum CmdTypeEnum {
|
|
|
-
|
|
|
- /**
|
|
|
- * 质检指令
|
|
|
- */
|
|
|
- QUALITY("quality", QualityCmdServiceImpl.class),
|
|
|
-
|
|
|
- /**
|
|
|
- * 任务指令
|
|
|
- */
|
|
|
- TASK("task", TaskCmdServiceImpl.class),
|
|
|
-
|
|
|
- /**
|
|
|
- * 设置指令
|
|
|
- */
|
|
|
- SETS("sets", SetsCmdServiceImpl.class),
|
|
|
-
|
|
|
- ;
|
|
|
-
|
|
|
- /**
|
|
|
- * 指令业务类型
|
|
|
- */
|
|
|
- private String cmdType;
|
|
|
-
|
|
|
- /**
|
|
|
- * 接口实现
|
|
|
- */
|
|
|
- private Class<? extends CmdService> cmdServiceClass;
|
|
|
-
|
|
|
-
|
|
|
- public static CmdTypeEnum getEnum(String cmdType) {
|
|
|
- CmdTypeEnum[] values = CmdTypeEnum.values();
|
|
|
- for (CmdTypeEnum qualityEnum : values) {
|
|
|
- if (qualityEnum.cmdType.equals(cmdType)) {
|
|
|
- return qualityEnum;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
+ @Timer
|
|
|
+ @Runners
|
|
|
+ @PostMapping("sysnAtoms")
|
|
|
+ @ApiOperation("同步sn号redis原子变量")
|
|
|
+ public void sysnAtoms() {
|
|
|
+ //todo 集群有问题,暂不处理
|
|
|
+ String sn = LambdaUtils.getProperty(MqttCommand::getSn);
|
|
|
+ QueryWrapper<MqttCommand> wrapper = new QueryWrapper<MqttCommand>()
|
|
|
+ .select(String.format("max(%s) as %s", sn, sn));
|
|
|
+ MqttCommand mqttCommand = getOne(wrapper);
|
|
|
+ if (mqttCommand != null) {
|
|
|
+ redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1);
|
|
|
+ MqttCommand byId = getById(mqttCommand.getSn());
|
|
|
+ redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, byId.getWkSn() + 1);
|
|
|
}
|
|
|
+ snInit = true;
|
|
|
}
|
|
|
}
|