DataConsumer.java 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package com.xy.consumer.data;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.xy.config.DeviceThreadPoolConfig;
  5. import com.xy.consumer.MqttConsumer;
  6. import com.xy.dto.DataMqttVo;
  7. import com.xy.dto.DeviceDataDto;
  8. import com.xy.service.DeviceDataServiceImpl;
  9. import com.xy.service.DeviceStatusServiceImpl;
  10. import com.xy.utils.Emptys;
  11. import com.xy.utils.ThreadPoolUtils;
  12. import lombok.AllArgsConstructor;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.stereotype.Service;
  15. import java.util.List;
  16. /**
  17. * <p>
  18. * 数据统计消费者
  19. * </p>
  20. *
  21. * @author lijin
  22. * @since 2022-03-11
  23. */
  24. @Slf4j
  25. @Service
  26. @AllArgsConstructor
  27. public class DataConsumer implements MqttConsumer {
  28. private DeviceDataServiceImpl deviceDataService;
  29. private DeviceStatusServiceImpl deviceStatusService;
  30. @Override
  31. public boolean message(String topic, String payload) {
  32. ThreadPoolUtils.excPoll(DeviceThreadPoolConfig.DEVICE_DATA_POLL, 1)
  33. .execute(() -> {
  34. JSONObject jsonObject = JSONUtil.parseObj(payload);
  35. DataMqttVo dataMqttVo = jsonObject.toBean(DataMqttVo.class);
  36. Boolean type = dataMqttVo.getType();
  37. List<DataMqttVo.Goods> goods = dataMqttVo.getGoods();
  38. Integer money = dataMqttVo.getMoney();
  39. int number = 0;
  40. if (Emptys.check(goods)) {
  41. for (DataMqttVo.Goods good : goods) {
  42. number = number + good.getNumber();
  43. }
  44. }
  45. //设备统计
  46. DeviceDataDto.SaveOrAccum saveOrAccum = new DeviceDataDto.SaveOrAccum()
  47. .setDeviceId(dataMqttVo.getDeviceId());
  48. saveOrAccum.setMercId(dataMqttVo.getMercId());
  49. if (type) {
  50. //购买
  51. if (dataMqttVo.getIsError()) {
  52. //异常单
  53. saveOrAccum.setRiskCount(1);
  54. } else {
  55. if (money == null || money == 0) {
  56. //0元单
  57. saveOrAccum.setZeroCount(1);
  58. } else {
  59. //正常单,累计商品数量
  60. saveOrAccum.setSalesMoney(money);
  61. saveOrAccum.setSalesCount(1);
  62. saveOrAccum.setGoodsCount(number);
  63. }
  64. }
  65. } else {
  66. //退款
  67. saveOrAccum.setRefundMoney(money);
  68. saveOrAccum.setRefundCount(1);
  69. }
  70. deviceDataService.saveOrAccum(saveOrAccum);
  71. //修改库存
  72. if (number > 0) {
  73. deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
  74. }
  75. })
  76. .error(e -> log.error("统计消费者异常,{}", e));
  77. return true;
  78. }
  79. }