package com.xy.service; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xy.alipay.AliPay; import com.xy.annotate.Runners; import com.xy.annotate.Timer; import com.xy.collections.map.JHashMap; import com.xy.collections.map.JMap; import com.xy.consumer.cmd.CmdProducer; import com.xy.dto.AckMqttDto; import com.xy.dto.CommandMqtt; import com.xy.dto.MqttDto; import com.xy.dto.PaterDto; import com.xy.entity.MqttCommand; import com.xy.error.CommRuntimeException; import com.xy.mapper.MqttCommandMapper; import com.xy.utils.*; import com.xy.utils.consts.CommConsts; import com.xy.utils.enums.CmdTypeEnum; import com.xy.utils.enums.MqttCommandStatusEnum; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import static com.xy.utils.PlusBeans.*; @Slf4j @Service @Api(tags = "mqtt接口") @RequiredArgsConstructor public class MqttServiceImpl extends ServiceImpl implements MqttService { private final CmdProducer cmdProducer; private final RedisService redisService; private String keyStr = "mqtt:cmd:result:%d"; private boolean snInit; private long getSn() { if (!snInit) { throw new CommRuntimeException("sn号未初始化"); } return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN); } private long getWkSn() { if (!snInit) { throw new CommRuntimeException("sn号未初始化"); } return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN); } private List> sendMqtt(List mqttDtos) { String time = DataTime.getSring(); List> list = new ArrayList<>(mqttDtos.size()); mqttDtos.forEach(mqttDto -> { //给予默认值 PaterDto pater = mqttDto.getPater() .setSn(getSn()) .setTime(time) .setDeviceId(mqttDto.getDeviceId()) .setDebug(mqttDto.getDebug()); //延迟发布处理 String delayTime = mqttDto.getDelayTime(); String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX; if (!StringUtils.isEmpty(delayTime)) { //消息消费时间 - 发布时间的差值 = 距今被消费的秒数 long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s"); topic = String.format("$delayed/%d/%s", m, topic); mqttDto.setTimeout(mqttDto.getTimeout() + (int) m); } //消息过期时间 pater.setTimeout(mqttDto.getTimeout()); JSONObject value = JSONUtil.parseObj(pater); Long wkSn = pater.getWkSn(); try { //指令记录 saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null); //发送消息 cmdProducer.sendToMqtt(value.toString(), topic, mqttDto.getLevel()); list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true)); } catch (Exception e) { //指令记录 saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage()); list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false)); } }); return list; } @Timer @Runners @PostMapping("sysnAtoms") @ApiOperation("同步sn号redis原子变量") public void sysnAtoms() { //todo 集群有问题,暂不处理 String sn = LambdaUtils.getProperty(MqttCommand::getSn); QueryWrapper wrapper = new QueryWrapper() .select(String.format("max(%s) as %s", sn, sn)); MqttCommand mqttCommand = getOne(wrapper); if (mqttCommand != null) { redisService.setAtomic(CommConsts.CMD_ATOM_SN, mqttCommand.getSn() + 1); MqttCommand byId = getById(mqttCommand.getSn()); redisService.setAtomic(CommConsts.CMD_ATOM_WK_SN, byId.getWkSn() + 1); } snInit = true; } @Override @ApiOperation("指令发送") @AliPay(AliPay.Type.RESTART) public R>> senCommand(List commandMqtts) { //发送指令 List mqttDtos = new ArrayList<>(); commandMqtts.forEach(commandMqtt -> { JSONObject templetObj = commandMqtt.getTemplet(); String debug = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDebug)); String level = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getLevel)); String delayTime = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getDelayTime)); String timeout = templetObj.getStr(LambdaUtils.getProperty(MqttDto::getTimeout)); String wkSn = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getWkSn)); String actionType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getActionType)); String cmdType = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getCmdType)); String ack = templetObj.getStr(LambdaUtils.getProperty(PaterDto::getAck)); JSONObject data = templetObj.getJSONObject(LambdaUtils.getProperty(PaterDto::getData)); PaterDto paterDto = new PaterDto() .setActionType(actionType) .setCmdType(cmdType) .setAck(Emptys.check(ack) && !ack.equals("$ack$") ? Boolean.valueOf(ack) : Emptys.check(ack) && ack.equals("$ack$") ? true : true) .setWkSn(!Emptys.check(wkSn) ? getWkSn() : "$wkSn$".equals(wkSn) ? getWkSn() : Long.valueOf(wkSn)) .setData(data); MqttDto mqttDto = new MqttDto() .setDeviceId(commandMqtt.getDeviceId()) .setDebug(Emptys.check(debug) && !debug.equals("$debug$") ? Boolean.valueOf(debug) : Emptys.check(debug) && debug.equals("$debug$") ? false : false) .setLevel(Emptys.check(level) && !level.equals("$level$") ? Integer.valueOf(level) : Emptys.check(level) && level.equals("$level$") ? 1 : 1) .setDelayTime(Emptys.check(delayTime) && !delayTime.equals("$delayTime$") ? delayTime : Emptys.check(delayTime) && delayTime.equals("$delayTime$") ? null : null) .setTimeout(Emptys.check(timeout) && !timeout.equals("$timeout$") ? Integer.valueOf(timeout) : Emptys.check(timeout) && timeout.equals("$timeout$") ? 60 : 60) .setPater(paterDto); mqttDtos.add(mqttDto); }); List> list = sendMqtt(mqttDtos); //执行业务 Map> map = new HashMap<>(); mqttDtos.forEach(mqttDto -> { String cmdType = mqttDto.getPater().getCmdType(); if (!map.containsKey(cmdType)) { map.put(cmdType, new ArrayList<>()); } map.get(cmdType).add(mqttDto); }); map.forEach((k, v) -> { CmdTypeEnum anEnum = CmdTypeEnum.getEnum(k); if (anEnum == null) { return; } SpringBeanUtils.getBean(anEnum.getCmdServiceClass()).senCommand(v); }); return R.ok(list); } @ApiOperation("指令Ack") @PostMapping("commandAck") public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) { MqttCommand mqttCommand = getById(ackMqttDto.getSn()); if (!Emptys.check(mqttCommand)) { return R.fail("数据不存在"); } //写入mysql mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey())) .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime())) .setBackServerTime(DataTime.getSring()); updateById(mqttCommand); //写入redis String result = ackMqttDto.getResult().toString(); if (Emptys.check(ackMqttDto.getResult())) { String key = String.format(keyStr, ackMqttDto.getSn()); String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime()); Map map = redisService.getMap(key); if (Emptys.check(map)) { String time = map.get("time"); if (DataTime.stringContrast(time, backClientTime) < 0) { return R.ok(); } } JMap jmap = new JHashMap().set("time", backClientTime).set("result", result); redisService.setMap(key, jmap); redisService.timeout(key, 300); } return R.ok(); } @ApiOperation("指令结果通知") @PostMapping("commandResultBack") public R commandResultBack(@RequestBody @Validated MqttDto.ResultBack resultBack) { return SpringBeanUtils.getBean(CmdTypeEnum.getEnum(resultBack.getCmdType()).getCmdServiceClass()).resultBack(resultBack); } @PostMapping("snByCmdAndResult") @ApiOperation("sn查询指令和结果数据") public R snByCmdAndResult(@RequestBody MqttDto.SnByCmdAndResult snByCmdAndResult) { MqttCommand mqttCommand = getById(snByCmdAndResult.getSn()); String key = String.format(keyStr, snByCmdAndResult.getSn()); Map map = redisService.getMap(key); MqttDto.Vo3 vo3 = copy(MqttDto.Vo3.class, mqttCommand); if (Emptys.check(map)) { String result = map.get("result"); vo3.setResult(JSONUtil.parseObj(result)); } return R.ok(vo3); } @PostMapping("page") @ApiOperation("分页查询") public R> page(@RequestBody MqttDto.Page page) { LambdaQueryWrapper lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class) .like(MqttCommand::getError) .like(MqttCommand::getValue) .ge(MqttCommand::getSendTime, page.getBeginSendTime()) .le(MqttCommand::getSendTime, page.getEndSendTime()) .ge(MqttCommand::getTimeout, page.getBeginTimeout()) .le(MqttCommand::getTimeout, page.getEndTimeout()) .ge(MqttCommand::getDelayTime, page.getBeginDelayTime()) .le(MqttCommand::getDelayTime, page.getEndDelayTime()) .build() .orderByDesc(MqttCommand::getSendTime); PageBean pageBean = page.getPage(); IPage iPage = page(toIPage(pageBean), lambdaQueryWrapper); return R.ok(toPageBean(MqttDto.Vo2.class, iPage)); } /** * 新增指令记录 * * @param mqttDto 参数对象 * @param value 参数json对象 * @param status 状态 * @param error 错误信息 */ private void saveMqttCommand(MqttDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) { PaterDto pater = mqttDto.getPater(); MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater) .setSn(pater.getSn()) .setWkSn(pater.getWkSn()) .setTopic(topic) .setStatus(Integer.parseInt(status.getKey())) .setValue(value.toString()) .setSendTime(pater.getTime()) .setNum(1) .setError(error); save(mqttCommand); } }