DataConsumer.java 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package com.xy.consumer.data;
  2. import cn.hutool.json.JSONObject;
  3. import cn.hutool.json.JSONUtil;
  4. import com.xy.annotate.MqttConsumerAsyn;
  5. import com.xy.config.DeviceThreadPoolConfig;
  6. import com.xy.consumer.MqttConsumer;
  7. import com.xy.dto.DataMqttVo;
  8. import com.xy.dto.DeviceDataDto;
  9. import com.xy.service.DeviceDataServiceImpl;
  10. import com.xy.service.DeviceStatusServiceImpl;
  11. import com.xy.utils.Emptys;
  12. import lombok.AllArgsConstructor;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.stereotype.Service;
  15. import java.util.List;
  16. /**
  17. * <p>
  18. * 数据统计消费者
  19. * </p>
  20. *
  21. * @author lijin
  22. * @since 2022-03-11
  23. */
  24. @Slf4j
  25. @Service
  26. @AllArgsConstructor
  27. @MqttConsumerAsyn(DeviceThreadPoolConfig.DEVICE_DATA_POLL)
  28. public class DataConsumer implements MqttConsumer {
  29. private DeviceDataServiceImpl deviceDataService;
  30. private DeviceStatusServiceImpl deviceStatusService;
  31. @Override
  32. public boolean message(String topic, String payload) {
  33. JSONObject jsonObject = JSONUtil.parseObj(payload);
  34. DataMqttVo dataMqttVo = jsonObject.toBean(DataMqttVo.class);
  35. Boolean type = dataMqttVo.getType();
  36. List<DataMqttVo.Goods> goods = dataMqttVo.getGoods();
  37. Integer money = dataMqttVo.getMoney();
  38. int number = 0;
  39. if (Emptys.check(goods)) {
  40. for (DataMqttVo.Goods good : goods) {
  41. number = number + good.getNumber();
  42. }
  43. }
  44. //设备统计
  45. DeviceDataDto.SaveOrAccum saveOrAccum = new DeviceDataDto.SaveOrAccum()
  46. .setDeviceId(dataMqttVo.getDeviceId());
  47. saveOrAccum.setMercId(dataMqttVo.getMercId());
  48. if (type) {
  49. //购买
  50. if (dataMqttVo.getIsError()) {
  51. //异常单
  52. saveOrAccum.setRiskCount(1);
  53. } else {
  54. if (money == null || money == 0) {
  55. //0元单
  56. saveOrAccum.setZeroCount(1);
  57. } else {
  58. //正常单,累计商品数量
  59. saveOrAccum.setSalesMoney(money);
  60. saveOrAccum.setSalesCount(1);
  61. saveOrAccum.setGoodsCount(number);
  62. }
  63. }
  64. } else {
  65. //退款
  66. saveOrAccum.setRefundMoney(money);
  67. saveOrAccum.setRefundCount(1);
  68. }
  69. deviceDataService.saveOrAccum(saveOrAccum);
  70. //修改库存
  71. if (number > 0) {
  72. deviceStatusService.upStock(dataMqttVo.getDeviceId(), type ? (~(number - 1)) : number);
  73. }
  74. return true;
  75. }
  76. }