123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- package com.xy.service;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.xy.alipay.AliPay;
- import com.xy.annotate.Runners;
- import com.xy.annotate.Timer;
- import com.xy.collections.map.JHashMap;
- import com.xy.collections.map.JMap;
- import com.xy.consumer.cmd.CmdProducer;
- import com.xy.dto.AckMqttDto;
- import com.xy.dto.CommandMqtt;
- import com.xy.dto.MqttDto;
- import com.xy.dto.PaterDto;
- import com.xy.entity.MqttCommand;
- import com.xy.error.CommRuntimeException;
- import com.xy.mapper.MqttCommandMapper;
- import com.xy.utils.*;
- import com.xy.utils.consts.CommConsts;
- import com.xy.utils.enums.CmdTypeEnum;
- import com.xy.utils.enums.MqttCommandStatusEnum;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.util.StringUtils;
- import org.springframework.validation.annotation.Validated;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import static com.xy.utils.PlusBeans.*;
- @Slf4j
- @Service
- @Api(tags = "mqtt接口")
- @RequiredArgsConstructor
- public class MqttServiceImpl extends ServiceImpl<MqttCommandMapper, MqttCommand> implements MqttService {
- private final CmdProducer cmdProducer;
- private final RedisService<String> redisService;
- private String keyStr = "mqtt:cmd:result:%d";
- private boolean snInit;
- private long getSn() {
- if (!snInit) {
- throw new CommRuntimeException("sn号未初始化");
- }
- return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN);
- }
- private long getWkSn() {
- if (!snInit) {
- throw new CommRuntimeException("sn号未初始化");
- }
- return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN);
- }
- private List<Tuple.Tuple3<Long, Long, Boolean>> sendMqtt(List<MqttDto> mqttDtos) {
- String time = DataTime.getSring();
- List<Tuple.Tuple3<Long, Long, Boolean>> list = new ArrayList<>(mqttDtos.size());
- mqttDtos.forEach(mqttDto -> {
- //给予默认值
- PaterDto pater = mqttDto.getPater()
- .setSn(getSn())
- .setTime(time)
- .setDeviceId(mqttDto.getDeviceId())
- .setDebug(mqttDto.getDebug());
- //延迟发布处理
- String delayTime = mqttDto.getDelayTime();
- String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX;
- if (!StringUtils.isEmpty(delayTime)) {
- //消息消费时间 - 发布时间的差值 = 距今被消费的秒数
- long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s");
- topic = String.format("$delayed/%d/%s", m, topic);
- mqttDto.setTimeout(mqttDto.getTimeout() + (int) m);
- }
- //消息过期时间
- pater.setTimeout(mqttDto.getTimeout());
- JSONObject value = JSONUtil.parseObj(pater);
- Long wkSn = pater.getWkSn();
- try {
- //指令记录
- saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null);
- //发送消息
- cmdProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel());
- list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true));
- } catch (Exception e) {
- //指令记录
- saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage());
- list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false));
- }
- });
- return list;
- }
- @Timer
- @Runners
- @PostMapping("sysnAtoms")
- @ApiOperation("同步sn号redis原子变量")
- public void sysnAtoms() {
- //todo 集群有问题,暂不处理
- String sn = LambdaUtils.getProperty(MqttCommand::getSn);
- QueryWrapper<MqttCommand> wrapper = new QueryWrapper<MqttCommand>()
- .select(String.format("max(%s) as %s", sn, sn));
- MqttCommand mqttCommand = getOne(wrapper);
- if (mqttCommand != null) {
- redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1);
- MqttCommand byId = getById(mqttCommand.getSn());
- redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, byId.getWkSn() + 1);
- }
- snInit = true;
- }
- @Override
- @ApiOperation("指令发送")
- @AliPay(AliPay.Type.RESTART)
- public R<List<Tuple.Tuple3<Long, Long, Boolean>>> senCommand(List<CommandMqtt> commandMqtts) {
- //发送指令
- List<MqttDto> mqttDtos = new ArrayList<>();
- commandMqtts.forEach(commandMqtt -> {
- JSONObject templetObj = commandMqtt.getTemplet();
- String debug = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDebug));
- String level = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getLevel));
- String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDelayTime));
- String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getTimeout));
- String wkSn = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getWkSn));
- String actionType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getActionType));
- String cmdType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getCmdType));
- String ack = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getAck));
- JSONObject data = templetObj.getJSONObject(LambdaUtils.getProperty(PaterDto::getData));
- PaterDto paterDto = new PaterDto()
- .setActionType(actionType)
- .setCmdType(cmdType)
- .setAck(Emptys.check(ack) && !ack.equals("$ack$") ? Boolean.valueOf(ack) : Emptys.check(ack) && ack.equals("$ack$") ? true : true)
- .setWkSn(!Emptys.check(wkSn) ? getWkSn() : "$wkSn$".equals(wkSn) ? getWkSn() : Long.valueOf(wkSn))
- .setData(data);
- MqttDto mqttDto = new MqttDto()
- .setDeviceId(commandMqtt.getDeviceId())
- .setDebug(Emptys.check(debug) && !debug.equals("$debug$") ? Boolean.valueOf(debug) : Emptys.check(debug) && debug.equals("$debug$") ? false : false)
- .setLevel(Emptys.check(level) && !level.equals("$level$") ? Integer.valueOf(level) : Emptys.check(level) && level.equals("$level$") ? 1 : 1)
- .setDelayTime(Emptys.check(delayTime) && !delayTime.equals("$delayTime$") ? delayTime : Emptys.check(delayTime) && delayTime.equals("$delayTime$") ? null : null)
- .setTimeout(Emptys.check(timeout) && !timeout.equals("$timeout$") ? Integer.valueOf(timeout) : Emptys.check(timeout) && timeout.equals("$timeout$") ? 60 : 60)
- .setPater(paterDto);
- mqttDtos.add(mqttDto);
- });
- List<Tuple.Tuple3<Long, Long, Boolean>> list = sendMqtt(mqttDtos);
- //执行业务
- Map<String, List<MqttDto>> map = new HashMap<>();
- mqttDtos.forEach(mqttDto -> {
- String cmdType = mqttDto.getPater().getCmdType();
- if (!map.containsKey(cmdType)) {
- map.put(cmdType, new ArrayList<>());
- }
- map.get(cmdType).add(mqttDto);
- });
- map.forEach((k, v) -> {
- CmdTypeEnum anEnum = CmdTypeEnum.getEnum(k);
- if (anEnum == null) {
- return;
- }
- SpringBeanUtils.getBean(anEnum.getCmdServiceClass()).senCommand(v);
- });
- return R.ok(list);
- }
- @ApiOperation("指令Ack")
- @PostMapping("commandAck")
- public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) {
- MqttCommand mqttCommand = getById(ackMqttDto.getSn());
- if (!Emptys.check(mqttCommand)) {
- return R.fail("数据不存在");
- }
- //写入mysql
- mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey()))
- .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime()))
- .setBackServerTime(DataTime.getSring());
- updateById(mqttCommand);
- //写入redis
- String result = ackMqttDto.getResult().toString();
- if (Emptys.check(ackMqttDto.getResult())) {
- String key = String.format(keyStr, ackMqttDto.getSn());
- String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime());
- Map<String, String> map = redisService.getMap(key);
- if (Emptys.check(map)) {
- String time = map.get("time");
- if (DataTime.stringContrast(time, backClientTime) < 0) {
- return R.ok();
- }
- }
- JMap<String, String> jmap = new JHashMap<String, String>().set("time", backClientTime).set("result", result);
- redisService.setMap(key, jmap);
- redisService.timeout(key, 300);
- }
- return R.ok();
- }
- @ApiOperation("指令结果通知")
- @PostMapping("commandResultBack")
- public R commandResultBack(@RequestBody @Validated MqttDto.ResultBack resultBack) {
- return SpringBeanUtils.getBean(CmdTypeEnum.getEnum(resultBack.getCmdType()).getCmdServiceClass()).resultBack(resultBack);
- }
- @PostMapping("snByCmdAndResult")
- @ApiOperation("sn查询指令和结果数据")
- public R<MqttDto.Vo3> snByCmdAndResult(@RequestBody MqttDto.SnByCmdAndResult snByCmdAndResult) {
- MqttCommand mqttCommand = getById(snByCmdAndResult.getSn());
- String key = String.format(keyStr, snByCmdAndResult.getSn());
- Map<String, String> map = redisService.getMap(key);
- MqttDto.Vo3 vo3 = copy(MqttDto.Vo3.class, mqttCommand);
- if (Emptys.check(map)) {
- String result = map.get("result");
- vo3.setResult(JSONUtil.parseObj(result));
- }
- return R.ok(vo3);
- }
- @PostMapping("page")
- @ApiOperation("分页查询")
- public R<PageBean<MqttDto.Vo2>> page(@RequestBody MqttDto.Page page) {
- LambdaQueryWrapper<MqttCommand> lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class)
- .like(MqttCommand::getError)
- .like(MqttCommand::getValue)
- .ge(MqttCommand::getSendTime, page.getBeginSendTime())
- .le(MqttCommand::getSendTime, page.getEndSendTime())
- .ge(MqttCommand::getTimeout, page.getBeginTimeout())
- .le(MqttCommand::getTimeout, page.getEndTimeout())
- .ge(MqttCommand::getDelayTime, page.getBeginDelayTime())
- .le(MqttCommand::getDelayTime, page.getEndDelayTime())
- .build()
- .orderByDesc(MqttCommand::getSendTime);
- PageBean pageBean = page.getPage();
- IPage<MqttCommand> iPage = page(toIPage(pageBean), lambdaQueryWrapper);
- return R.ok(toPageBean(MqttDto.Vo2.class, iPage));
- }
- /**
- * 新增指令记录
- *
- * @param mqttDto 参数对象
- * @param value 参数json对象
- * @param status 状态
- * @param error 错误信息
- */
- private void saveMqttCommand(MqttDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) {
- PaterDto pater = mqttDto.getPater();
- MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater)
- .setSn(pater.getSn())
- .setWkSn(pater.getWkSn())
- .setTopic(topic)
- .setStatus(Integer.parseInt(status.getKey()))
- .setValue(value.toString())
- .setSendTime(pater.getTime())
- .setNum(1)
- .setError(error);
- save(mqttCommand);
- }
- }
|