package com.xy.service; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.xy.annotation.LogOperate; import com.xy.collections.list.JArrayList; import com.xy.collections.list.JList; import com.xy.collections.map.JHashMap; import com.xy.collections.map.JMap; import com.xy.dto.*; import com.xy.entity.DeviceInfo; import com.xy.entity.MqttCommand; import com.xy.mapper.MqttCommandMapper; import com.xy.service.factory.cmd.CmdService; import com.xy.service.factory.device.DeviceFactory; import com.xy.utils.*; import com.xy.utils.enums.MqttCommandStatusEnum; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import java.util.ArrayList; import java.util.List; import java.util.Map; import static com.xy.utils.PlusBeans.*; @Slf4j @Service @Api(tags = "mqtt接口") @RequiredArgsConstructor public class MqttServiceImpl extends ServiceImpl implements MqttService { private final DeviceMqttSendService deviceMqttSendService; private final RedisService redisService; private final DeviceInfoServiceImpl deviceInfoService; private String keyStr = "mqtt:cmd:result:%d"; public long getSn() { return redisService.atomicIncrease(CommConsts.CMD_ATOM_SN); } public long getWkSn() { return redisService.atomicIncrease(CommConsts.CMD_ATOM_WK_SN); } public List> sendMqtt(List mqttDtos) { String time = DataTime.getSring(); List> list = new ArrayList<>(mqttDtos.size()); mqttDtos.forEach(mqttDto -> { //给予默认值 PaterDto pater = mqttDto.getPater() .setSn(getSn()) .setTime(time) .setDeviceId(mqttDto.getDeviceId()) .setDebug(mqttDto.getDebug()); //延迟发布处理 String delayTime = mqttDto.getDelayTime(); String topic = mqttDto.getDeviceId() + CommConsts.DEVICE_MQTT_TOPIC_SUFFIX; if (!StringUtils.isEmpty(delayTime)) { //消息消费时间 - 发布时间的差值 = 距今被消费的秒数 long m = DataTime.diff(DataTime.toLocal(pater.getTime()), DataTime.toLocal(delayTime), "s"); topic = String.format("$delayed/%d/%s", m, topic); mqttDto.setTimeout(mqttDto.getTimeout() + (int) m); } //消息过期时间 pater.setTimeout(mqttDto.getTimeout()); JSONObject value = JSONUtil.parseObj(pater); Long wkSn = pater.getWkSn(); try { //指令记录 saveMqttCommand(mqttDto, topic, value, pater.getAck() ? MqttCommandStatusEnum.COMMAND_SEND : MqttCommandStatusEnum.COMMAND_EXE_SUCCESS, null); //发送消息 deviceMqttSendService.cmd(new MqttDto.CmdParams() .setQos(mqttDto.getLevel()) .setData(value.toString()) .setTopic(topic) ); list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), true)); } catch (Exception e) { //指令记录 saveMqttCommand(mqttDto, topic, value, MqttCommandStatusEnum.COMMAND_SEND_FAIL, e.getMessage()); list.add(new Tuple.Tuple3<>(wkSn, pater.getSn(), false)); } }); return list; } @Override @LogOperate @ApiOperation("指令发送") public R>> senCommand(List commandMqtts) { JList commandMqttsList = new JArrayList<>(commandMqtts); List deviceInfos = deviceInfoService.list(new LambdaQueryWrapper().in(DeviceInfo::getDeviceId, commandMqttsList.getProperty(CommandMqtt::getDeviceId).comparing())); JMap deviceInfosJMaps = new JArrayList<>(deviceInfos).toMap(DeviceInfo::getDeviceId).cover(); JMap> jMap = new JHashMap<>(); commandMqtts.forEach(commandMqtt -> { DeviceInfo deviceInfo = deviceInfosJMaps.get(commandMqtt.getDeviceId()); if (deviceInfo == null) { return; } if (!jMap.containsKey(deviceInfo.getDeviceType())) { jMap.put(deviceInfo.getDeviceType(), new JArrayList<>()); } jMap.get(deviceInfo.getDeviceType()).set(commandMqtt); }); List> list = new ArrayList<>(); jMap.forEach((deviceType, commandMqttss) -> { List> data = FactoryUtils.getServiceRoute(DeviceFactory.class, deviceType) .senCommand(commandMqttss).getData(); if (data != null) { list.addAll(data); } }); JList> values = jMap.getValues(); values.forEach(value -> value.forEach(commandMqtt -> { JSONObject templet = commandMqtt.getTemplet(); templet.set("search", String.format("%s_%s", commandMqtt.getDeviceId(), templet.getJSONObject("data").getStr("type"))); commandMqtt.setTemplet(templet); })); return R.ok(list).setLogMsg(JSONUtil.parseArray(values).toString()); } @ApiOperation("指令Ack") @PostMapping("commandAck") public R commandAck(@RequestBody @Validated AckMqttDto ackMqttDto) { MqttCommand mqttCommand = getById(ackMqttDto.getSn()); if (!Emptys.check(mqttCommand)) { return R.fail("数据不存在"); } //写入mysql mqttCommand.setStatus(Integer.valueOf(ackMqttDto.getStatusEnum().getKey())) .setBackClientTime(DataTime.toString(ackMqttDto.getBackClientTime())) .setBackServerTime(DataTime.getSring()); updateById(mqttCommand); //写入redis String result = ackMqttDto.getResult().toString(); if (Emptys.check(ackMqttDto.getResult())) { String key = String.format(keyStr, ackMqttDto.getSn()); String backClientTime = DataTime.toString(ackMqttDto.getBackClientTime()); Map map = redisService.getMap(key); if (Emptys.check(map)) { String time = map.get("time"); if (DataTime.stringContrast(time, backClientTime) >= 0) { return R.ok(); } } JMap jmap = new JHashMap().set("time", backClientTime).set("result", result); redisService.setMap(key, jmap); redisService.timeout(key, 300); } return R.ok(); } @ApiOperation("指令结果通知") @PostMapping("commandResultBack") public R commandResultBack(@RequestBody @Validated MqttCmdDto.ResultBack resultBack) { return FactoryUtils.getService(CmdService.class, resultBack.getCmdType()).resultBack(resultBack); } @PostMapping("snByCmdAndResult") @ApiOperation("sn查询指令和结果数据") public R snByCmdAndResult(@RequestBody MqttCmdDto.SnByCmdAndResult snByCmdAndResult) { MqttCommand mqttCommand = getById(snByCmdAndResult.getSn()); String key = String.format(keyStr, snByCmdAndResult.getSn()); Map map = redisService.getMap(key); MqttCmdDto.Vo3 vo3 = copy(MqttCmdDto.Vo3.class, mqttCommand); if (Emptys.check(map)) { String result = map.get("result"); vo3.setResult(JSONUtil.parseObj(result)); } return R.ok(vo3); } @PostMapping("page") @ApiOperation("分页查询") public R> page(@RequestBody MqttCmdDto.Page page) { LambdaQueryWrapper lambdaQueryWrapper = new MybatisPlusQuery().eqWrapper(page, MqttCommand.class) .like(MqttCommand::getError) .like(MqttCommand::getValue) .ge(MqttCommand::getSendTime, page.getBeginSendTime()) .le(MqttCommand::getSendTime, page.getEndSendTime()) .ge(MqttCommand::getTimeout, page.getBeginTimeout()) .le(MqttCommand::getTimeout, page.getEndTimeout()) .ge(MqttCommand::getDelayTime, page.getBeginDelayTime()) .le(MqttCommand::getDelayTime, page.getEndDelayTime()) .build() .orderByDesc(MqttCommand::getSendTime); PageBean pageBean = page.getPage(); IPage iPage = page(toIPage(pageBean), lambdaQueryWrapper); return R.ok(toPageBean(MqttCmdDto.Vo2.class, iPage)); } /** * 新增指令记录 * * @param mqttDto 参数对象 * @param value 参数json对象 * @param status 状态 * @param error 错误信息 */ private void saveMqttCommand(MqttCmdDto mqttDto, String topic, JSONObject value, MqttCommandStatusEnum status, String error) { PaterDto pater = mqttDto.getPater(); MqttCommand mqttCommand = Beans.copy(Beans.copy(MqttCommand.class, mqttDto), pater) .setSn(pater.getSn()) .setWkSn(pater.getWkSn()) .setTopic(topic) .setStatus(Integer.parseInt(status.getKey())) .setValue(value.toString()) .setSendTime(pater.getTime()) .setNum(1) .setError(error); save(mqttCommand); } }