|
@@ -7,6 +7,7 @@ import com.xy.consumer.MqttConsumer;
|
|
import com.xy.dto.DataMqttVo;
|
|
import com.xy.dto.DataMqttVo;
|
|
import com.xy.dto.DeviceDataDto;
|
|
import com.xy.dto.DeviceDataDto;
|
|
import com.xy.service.DeviceDataServiceImpl;
|
|
import com.xy.service.DeviceDataServiceImpl;
|
|
|
|
+import com.xy.service.DeviceStatusServiceImpl;
|
|
import com.xy.utils.Emptys;
|
|
import com.xy.utils.Emptys;
|
|
import com.xy.utils.ThreadPoolUtils;
|
|
import com.xy.utils.ThreadPoolUtils;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.AllArgsConstructor;
|
|
@@ -30,6 +31,8 @@ public class DataConsumer implements MqttConsumer {
|
|
|
|
|
|
private DeviceDataServiceImpl deviceDataService;
|
|
private DeviceDataServiceImpl deviceDataService;
|
|
|
|
|
|
|
|
+ private DeviceStatusServiceImpl deviceStatusService;
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public boolean message(String topic, String payload) {
|
|
public boolean message(String topic, String payload) {
|
|
ThreadPoolUtils.excPoll(DeviceThreadPoolConfig.DEVICE_DATA_POLL, 1)
|
|
ThreadPoolUtils.excPoll(DeviceThreadPoolConfig.DEVICE_DATA_POLL, 1)
|
|
@@ -71,6 +74,10 @@ public class DataConsumer implements MqttConsumer {
|
|
saveOrAccum.setRefundCount(1);
|
|
saveOrAccum.setRefundCount(1);
|
|
}
|
|
}
|
|
deviceDataService.saveOrAccum(saveOrAccum);
|
|
deviceDataService.saveOrAccum(saveOrAccum);
|
|
|
|
+ //修改库存
|
|
|
|
+ if (number > 0) {
|
|
|
|
+ deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
|
|
|
|
+ }
|
|
})
|
|
})
|
|
.error(e -> log.error("统计消费者异常,{}", e));
|
|
.error(e -> log.error("统计消费者异常,{}", e));
|
|
return true;
|
|
return true;
|