|
@@ -2,6 +2,7 @@ package com.xy.service;
|
|
|
|
|
|
import cn.easyes.core.biz.EsPageInfo;
|
|
|
import cn.easyes.core.conditions.LambdaEsQueryWrapper;
|
|
|
+import cn.hutool.json.JSONArray;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import com.github.yitter.idgen.YitIdHelper;
|
|
@@ -10,13 +11,13 @@ import com.xy.collections.map.JConcurrentHashMap;
|
|
|
import com.xy.collections.map.JMap;
|
|
|
import com.xy.config.ThreadPoolConfig;
|
|
|
import com.xy.dto.BackMqttDto;
|
|
|
+import com.xy.dto.CommandMqtt;
|
|
|
import com.xy.dto.MqttDto;
|
|
|
import com.xy.dto.PaterDto;
|
|
|
import com.xy.entity.MqttCommand;
|
|
|
import com.xy.mapper.MqttCommandMapper;
|
|
|
import com.xy.producer.MqttProducer;
|
|
|
import com.xy.utils.*;
|
|
|
-import com.xy.utils.consts.CommConsts;
|
|
|
import com.xy.utils.enums.MqttCommandStatusEnum;
|
|
|
import io.swagger.annotations.Api;
|
|
|
import io.swagger.annotations.ApiOperation;
|
|
@@ -27,6 +28,7 @@ import org.springframework.util.StringUtils;
|
|
|
import org.springframework.web.bind.annotation.PostMapping;
|
|
|
import org.springframework.web.bind.annotation.RequestBody;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
@@ -42,10 +44,7 @@ public class MqttServiceImpl implements MqttService {
|
|
|
|
|
|
private MqttCommandMapper mqttCommandMapper;
|
|
|
|
|
|
- @Timer
|
|
|
- @Override
|
|
|
- @ApiOperation("指令发布")
|
|
|
- public Map<String, Boolean> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
+ private Map<String, Boolean> sendMqtt(List<MqttDto> mqttDtos) {
|
|
|
String time = DataTime.getSring();
|
|
|
JMap<String, Boolean> map = new JConcurrentHashMap<>(mqttDtos.size());
|
|
|
ThreadPoolUtils.Execute execute = ThreadPoolUtils.excPoll(ThreadPoolConfig.SEND_MQTT_POLL, mqttDtos.size());
|
|
@@ -54,7 +53,8 @@ public class MqttServiceImpl implements MqttService {
|
|
|
PaterDto pater = mqttDto.getPater()
|
|
|
.setSn(String.valueOf(YitIdHelper.nextId()))
|
|
|
.setTime(time)
|
|
|
- .setDeviceId(mqttDto.getDeviceId());
|
|
|
+ .setDeviceId(mqttDto.getDeviceId())
|
|
|
+ .setDebug(mqttDto.getDebug());
|
|
|
//延迟发布处理
|
|
|
String delayTime = mqttDto.getDelayTime();
|
|
|
String topic = mqttDto.getDeviceId().toString(); //+ CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
|
|
@@ -84,9 +84,44 @@ public class MqttServiceImpl implements MqttService {
|
|
|
return map;
|
|
|
}
|
|
|
|
|
|
+ @Timer
|
|
|
+ @Override
|
|
|
+ @ApiOperation("发送指令")
|
|
|
+ public R<Map<String, Boolean>> senCommand(List<CommandMqtt> commandMqtts) {
|
|
|
+ List<MqttDto> mqttDtos = new ArrayList<>();
|
|
|
+ for (CommandMqtt commandMqtt : commandMqtts) {
|
|
|
+ JSONObject templetObj = JSONUtil.parseObj(commandMqtt.getTemplet());
|
|
|
+ String debug = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDebug));
|
|
|
+ String level = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getLevel));
|
|
|
+ String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDelayTime));
|
|
|
+ String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getTimeout));
|
|
|
+ String wkSn = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getWkSn));
|
|
|
+ String actionType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getActionType));
|
|
|
+ String cmdType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getCmdType));
|
|
|
+ String ack = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getAck));
|
|
|
+ JSONArray data = templetObj.getJSONArray(LambdaUtils.getProperty(PaterDto::getData));
|
|
|
+ PaterDto paterDto = new PaterDto()
|
|
|
+ .setActionType(actionType)
|
|
|
+ .setCmdType(cmdType)
|
|
|
+ .setAck(Emptys.check(ack) && !ack.equals("$ack$") ? Boolean.valueOf(ack) : Emptys.check(ack) && ack.equals("$ack$") ? true : true)
|
|
|
+ .setWkSn(!Emptys.check(wkSn) ? String.valueOf(YitIdHelper.nextId()) : "$wkSn$".equals(wkSn) ? String.valueOf(YitIdHelper.nextId()) : wkSn)
|
|
|
+ .setData(data);
|
|
|
+ MqttDto mqttDto = new MqttDto()
|
|
|
+ .setDeviceId(commandMqtt.getDeviceId())
|
|
|
+ .setDebug(Emptys.check(debug) && !debug.equals("$debug$") ? Boolean.valueOf(debug) : Emptys.check(debug) && debug.equals("$debug$") ? false : false)
|
|
|
+ .setLevel(Emptys.check(level) && !level.equals("$level$") ? Integer.valueOf(level) : Emptys.check(level) && level.equals("$level$") ? 1 : 1)
|
|
|
+ .setDelayTime(Emptys.check(delayTime) && !delayTime.equals("$delayTime$") ? delayTime : Emptys.check(delayTime) && delayTime.equals("$delayTime$") ? null : null)
|
|
|
+ .setTimeout(Emptys.check(timeout) && !timeout.equals("$timeout$") ? Integer.valueOf(timeout) : Emptys.check(timeout) && timeout.equals("$timeout$") ? 60 : 60)
|
|
|
+ .setPater(paterDto);
|
|
|
+ mqttDtos.add(mqttDto);
|
|
|
+ }
|
|
|
+ Map<String, Boolean> map = sendMqtt(mqttDtos);
|
|
|
+ return R.ok(map);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
@ApiOperation("指令结果回执")
|
|
|
- public R mqttBack(BackMqttDto backMqttDto) {
|
|
|
+ public R commandBack(BackMqttDto backMqttDto) {
|
|
|
MqttCommand mqttCommand = mqttCommandMapper.selectById(backMqttDto.getSn());
|
|
|
if (!Emptys.check(mqttCommand)) {
|
|
|
return R.fail("数据不存在");
|