12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package com.xy.consumer.data;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.xy.config.DeviceThreadPoolConfig;
- import com.xy.consumer.MqttConsumer;
- import com.xy.dto.DataMqttVo;
- import com.xy.dto.DeviceDataDto;
- import com.xy.service.DeviceDataServiceImpl;
- import com.xy.service.DeviceStatusServiceImpl;
- import com.xy.utils.Emptys;
- import com.xy.utils.ThreadPoolUtils;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import java.util.List;
- /**
- * <p>
- * 数据统计消费者
- * </p>
- *
- * @author lijin
- * @since 2022-03-11
- */
- @Slf4j
- @Service
- @AllArgsConstructor
- public class DataConsumer implements MqttConsumer {
- private DeviceDataServiceImpl deviceDataService;
- private DeviceStatusServiceImpl deviceStatusService;
- @Override
- public boolean message(String topic, String payload) {
- ThreadPoolUtils.excPoll(DeviceThreadPoolConfig.DEVICE_DATA_POLL, 1)
- .execute(() -> {
- JSONObject jsonObject = JSONUtil.parseObj(payload);
- DataMqttVo dataMqttVo = jsonObject.toBean(DataMqttVo.class);
- Boolean type = dataMqttVo.getType();
- List<DataMqttVo.Goods> goods = dataMqttVo.getGoods();
- Integer money = dataMqttVo.getMoney();
- int number = 0;
- if (Emptys.check(goods)) {
- for (DataMqttVo.Goods good : goods) {
- number = number + good.getNumber();
- }
- }
- //设备统计
- DeviceDataDto.SaveOrAccum saveOrAccum = new DeviceDataDto.SaveOrAccum()
- .setDeviceId(dataMqttVo.getDeviceId());
- saveOrAccum.setMercId(dataMqttVo.getMercId());
- if (type) {
- //购买
- if (dataMqttVo.getIsError()) {
- //异常单
- saveOrAccum.setRiskCount(1);
- } else {
- if (money == null || money == 0) {
- //0元单
- saveOrAccum.setZeroCount(1);
- } else {
- //正常单,累计商品数量
- saveOrAccum.setSalesMoney(money);
- saveOrAccum.setSalesCount(1);
- saveOrAccum.setGoodsCount(number);
- }
- }
- } else {
- //退款
- saveOrAccum.setRefundMoney(money);
- saveOrAccum.setRefundCount(1);
- }
- deviceDataService.saveOrAccum(saveOrAccum);
- //修改库存
- if (number > 0) {
- deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
- }
- })
- .error(e -> log.error("统计消费者异常,{}", e));
- return true;
- }
- }
|