DisconnectedMqttConfiguration.java 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package com.xy.consumer.disconnect;
  2. import com.xy.configuration.MqttConfigUtils;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.integration.annotation.IntegrationComponentScan;
  6. import org.springframework.integration.annotation.ServiceActivator;
  7. import org.springframework.integration.channel.DirectChannel;
  8. import org.springframework.integration.core.MessageProducer;
  9. import org.springframework.messaging.MessageChannel;
  10. import org.springframework.messaging.MessageHandler;
  11. @Configuration
  12. @IntegrationComponentScan
  13. public class DisconnectedMqttConfiguration {
  14. /**
  15. * topic
  16. */
  17. public final static String TOPIC = "disConnectedNotify";
  18. /**
  19. * 入站通道名(消费者)订阅的bean名称
  20. */
  21. public static final String CHANNEL_NAME_IN = TOPIC + "MqttInboundChannel";
  22. /**
  23. * 出站通道名(生产者)发布的bean名称
  24. */
  25. public static final String CHANNEL_NAME_OUT = TOPIC + "MqttOutboundChannel";
  26. /*******************************生产者*******************************************/
  27. /**
  28. * MQTT信息通道(生产者)
  29. */
  30. @Bean(name = CHANNEL_NAME_OUT)
  31. public MessageChannel mqttOutboundChannel() {
  32. return new DirectChannel();
  33. }
  34. /**
  35. * MQTT消息处理器(生产者)
  36. */
  37. @Bean(name = TOPIC + "MqttOutbound")
  38. @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
  39. public MessageHandler mqttOutbound() {
  40. return MqttConfigUtils.mqttOutbound(TOPIC);
  41. }
  42. /*******************************消费者*******************************************/
  43. /**
  44. * MQTT信息通道(消费者)
  45. */
  46. @Bean(name = CHANNEL_NAME_IN)
  47. public MessageChannel mqttInboundChannel() {
  48. return new DirectChannel();
  49. }
  50. /**
  51. * MQTT消息订阅绑定(消费者)
  52. */
  53. @Bean(name = TOPIC + "Inbound")
  54. public MessageProducer inbound() {
  55. return MqttConfigUtils.inbound(TOPIC, mqttInboundChannel());
  56. }
  57. /**
  58. * MQTT消息处理器(消费者)
  59. */
  60. @Bean(name = TOPIC + "Handler")
  61. @ServiceActivator(inputChannel = CHANNEL_NAME_IN)
  62. public MessageHandler handler() {
  63. return MqttConfigUtils.handler();
  64. }
  65. }