123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- package com.xy.service;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.xy.annotation.LogOperate;
- import com.xy.collections.list.JArrayList;
- import com.xy.collections.list.JList;
- import com.xy.collections.map.JHashMap;
- import com.xy.collections.map.JMap;
- import com.xy.dto.*;
- import com.xy.entity.DeviceInfo;
- import com.xy.entity.MqttCommand;
- import com.xy.mapper.MqttCommandMapper;
- import com.xy.service.factory.cmd.CmdService;
- import com.xy.service.factory.device.DeviceFactory;
- import com.xy.utils.*;
- import com.xy.utils.enums.MqttCommandStatusEnum;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.util.StringUtils;
- import org.springframework.validation.annotation.Validated;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import static com.xy.utils.PlusBeans.*;
- @Slf4j
- @Service
- @Api(tags = "mqtt接口")
- @RequiredArgsConstructor
- public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand> implements MqttService {
- private final DeviceMqttSendService deviceMqttSendService;
- private final RedisService<String> redisService;
- private final DeviceInfoServiceImpl deviceInfoService;
- private String keyStr = "mqtt:cmd:result:%d";
- public long getSn() {
- return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
- }
- public long getWkSn() {
- return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
- }
- public List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttCmdDto> mqttDtos) {
- String time = DataTime.getSring();
- List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
- mqttDtos.forEach(mqttDto -> {
- //给予默认值
- PaterDto pater = mqttDto.getPater()
- .setSn(getSn())
- .setTime(time)
- .setDeviceId(mqttDto.getDeviceId())
- .setDebug(mqttDto.getDebug());
- //延迟发布处理
- String delayTime = mqttDto.getDelayTime();
- String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
- if (!StringUtils.isEmpty(delayTime)) {
- //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
- long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
- topic = String.format("$delayed/%d/%s", m, topic);
- mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
- }
- //消息过期时间
- pater.setTimeout(mqttDto.getTimeout());
- JSONObject value = JSONUtil.parseObj(pater);
- Long wkSn = pater.getWkSn();
- try {
- //指令记录
- saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
- //发送消息
- deviceMqttSendService.cmd(new MqttDto.CmdParams()
- .setQos(mqttDto.getLevel())
- .setData(value.toString())
- .setTopic(topic)
- );
- list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
- } catch (Exception e) {
- //指令记录
- saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
- list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false));
- }
- });
- return list;
- }
- @Override
- @LogOperate
- @ApiOperation("指令发送")
- public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
- JList<CommandMqtt> commandMqttsList = new JArrayList<>(commandMqtts);
- List<DeviceInfo> deviceInfos = deviceInfoService.list(new LambdaQueryWrapper<DeviceInfo>().in(DeviceInfo::getDeviceId, commandMqttsList.getProperty(CommandMqtt::getDeviceId).comparing()));
- JMap<Long, DeviceInfo> deviceInfosJMaps = new JArrayList<>(deviceInfos).toMap(DeviceInfo::getDeviceId).cover();
- JMap<Integer, JList<CommandMqtt>> jMap = new JHashMap<>();
- commandMqtts.forEach(commandMqtt -> {
- DeviceInfo deviceInfo = deviceInfosJMaps.get(commandMqtt.getDeviceId());
- if (deviceInfo == null) {
- return;
- }
- if (!jMap.containsKey(deviceInfo.getDeviceType())) {
- jMap.put(deviceInfo.getDeviceType(), new JArrayList<>());
- }
- jMap.get(deviceInfo.getDeviceType()).set(commandMqtt);
- });
- List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>();
- jMap.forEach((deviceType, commandMqttss) -> {
- List<Tuple.Tuple3<Long, Long, Boolean>> data = FactoryUtils.getServiceRoute(DeviceFactory.class, deviceType)
- .senCommand(commandMqttss).getData();
- if (data != null) {
- list.addAll(data);
- }
- });
- JList<JList<CommandMqtt>> values = jMap.getValues();
- values.forEach(value -> value.forEach(commandMqtt -> {
- JSONObject templet = commandMqtt.getTemplet();
- templet.set("search", String.format("%s_%s", commandMqtt.getDeviceId(), templet.getJSONObject("data").getStr("type")));
- commandMqtt.setTemplet(templet);
- }));
- return R.ok(list).setLogMsg(JSONUtil.parseArray(values).toString());
- }
- @ApiOperation("指令Ack")
- @PostMapping("commandAck")
- public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) {
- MqttCommand mqttCommand = getById(ackMqttDto.getSn());
- if (!Emptys.check(mqttCommand)) {
- return R.fail("数据不存在");
- }
- //写入mysql
- mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey()))
- .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime()))
- .setBackServerTime(DataTime.getSring());
- updateById(mqttCommand);
- //写入redis
- String result = ackMqttDto.getResult().toString();
- if (Emptys.check(ackMqttDto.getResult())) {
- String key = String.format(keyStr, ackMqttDto.getSn());
- String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime());
- Map<String, String> map = redisService.getMap(key);
- if (Emptys.check(map)) {
- String time = map.get("time");
- if (DataTime.stringContrast(time, backClientTime) >= 0) {
- return R.ok();
- }
- }
- JMap<String, String> jmap = new JHashMap<String, String>().set("time", backClientTime).set("result", result);
- redisService.setMap(key, jmap);
- redisService.timeout(key, 300);
- }
- return R.ok();
- }
- @ApiOperation("指令结果通知")
- @PostMapping("commandResultBack")
- public R commandResultBack(@RequestBody @Validated MqttCmdDto.ResultBack resultBack) {
- return FactoryUtils.getService(CmdService.class, resultBack.getCmdType()).resultBack(resultBack);
- }
- @PostMapping("snByCmdAndResult")
- @ApiOperation("sn查询指令和结果数据")
- public R<MqttCmdDto.Vo3> snByCmdAndResult(@RequestBody MqttCmdDto.SnByCmdAndResult snByCmdAndResult) {
- MqttCommand mqttCommand = getById(snByCmdAndResult.getSn());
- String key = String.format(keyStr, snByCmdAndResult.getSn());
- Map<String, String> map = redisService.getMap(key);
- MqttCmdDto.Vo3 vo3 = copy(MqttCmdDto.Vo3.class, mqttCommand);
- if (Emptys.check(map)) {
- String result = map.get("result");
- vo3.setResult(JSONUtil.parseObj(result));
- }
- return R.ok(vo3);
- }
- @PostMapping("page")
- @ApiOperation("分页查询")
- public R<PageBean<MqttCmdDto.Vo2>> page(@RequestBody MqttCmdDto.Page page) {
- LambdaQueryWrapper<MqttCommand> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class)
- .like(MqttCommand::getError)
- .like(MqttCommand::getValue)
- .ge(MqttCommand::getSendTime, page.getBeginSendTime())
- .le(MqttCommand::getSendTime, page.getEndSendTime())
- .ge(MqttCommand::getTimeout, page.getBeginTimeout())
- .le(MqttCommand::getTimeout, page.getEndTimeout())
- .ge(MqttCommand::getDelayTime, page.getBeginDelayTime())
- .le(MqttCommand::getDelayTime, page.getEndDelayTime())
- .build()
- .orderByDesc(MqttCommand::getSendTime);
- PageBean pageBean = page.getPage();
- IPage<MqttCommand> iPage = page(toIPage(pageBean), lambdaQueryWrapper);
- return R.ok(toPageBean(MqttCmdDto.Vo2.class, iPage));
- }
- /**
- * 新增指令记录
- *
- * @param mqttDto 参数对象
- * @param value 参数json对象
- * @param status 状态
- * @param error 错误信息
- */
- private void saveMqttCommand(MqttCmdDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
- PaterDto pater = mqttDto.getPater();
- MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
- .setSn(pater.getSn())
- .setWkSn(pater.getWkSn())
- .setTopic(topic)
- .setStatus(Integer.parseInt(status.getKey()))
- .setValue(value.toString())
- .setSendTime(pater.getTime())
- .setNum(1)
- .setError(error);
- save(mqttCommand);
- }
- }
|