Spring 框架为与消息系统集成提供了广泛的支持,从 JMS API 的简化使用JmsTemplate到异步接收消息的完整基础设施。Spring AMQP 为高级消息队列协议提供了类似的功能集。RabbitTemplateSpring Boot 还为RabbitMQ提供自动配置选项。Spring WebSocket 本身包含对 STOMP 消息传递的支持,而 Spring Boot 通过启动器和少量自动配置对此提供支持。Spring Boot 还支持 Apache Kafka。
1.JMS
该接口提供了创建与 JMS 代理交互jakarta.jms.ConnectionFactory的标准方法。jakarta.jms.Connection尽管 Spring 需要ConnectionFactory使用 JMS,但您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。(有关详细信息,请参阅 Spring 框架参考文档的相关部分。)Spring Boot 还会自动配置发送和接收消息所需的基础设施。
1.1. ActiveMQ“经典”支持
当ActiveMQ“Classic”在类路径上可用时,Spring Boot 可以配置一个ConnectionFactory.
| 如果您使用 spring-boot-starter-activemq,则会提供连接到 ActiveMQ“经典”实例所需的依赖项,以及与 JMS 集成的 Spring 基础结构。 | 
ActiveMQ“经典”配置由spring.activemq.*. 默认情况下,ActiveMQ“经典”自动配置为使用TCP 传输,默认连接到tcp://localhost:61616。以下示例显示如何更改默认代理 URL:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secretspring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"默认情况下,a使用合理的设置包装CachingConnectionFactory本机,ConnectionFactory您可以通过以下中的外部配置属性来控制这些设置spring.jms.*:
spring.jms.cache.session-cache-size=5spring:
  jms:
    cache:
      session-cache-size: 5如果您更愿意使用本机池,可以通过添加依赖项org.messaginghub:pooled-jms并进行相应配置来实现JmsPoolConnectionFactory,如以下示例所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50| 请参阅 ActiveMQProperties参考资料 了解更多支持的选项。您还可以注册任意数量的 Bean 来实现ActiveMQConnectionFactoryCustomizer更高级的自定义。 | 
默认情况下,ActiveMQ“经典”会创建一个目标(如果目标尚不存在),以便根据提供的名称解析目标。
1.2. ActiveMQ Artemis 支持
ConnectionFactory当 Spring Boot 检测到ActiveMQ Artemis在类路径上可用时,它可以自动配置 a 。如果代理存在,则会自动启动和配置嵌入式代理(除非已显式设置模式属性)。支持的模式是embedded(明确需要嵌入式代理,并且如果代理在类路径上不可用,则会发生错误)和native(使用传输协议连接到代理netty)。配置后者后,Spring Boot 将配置一个ConnectionFactory连接到使用默认设置在本地计算机上运行的代理的 。
| 如果您使用 spring-boot-starter-artemis,则会提供连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础设施。通过添加org.apache.activemq:artemis-jakarta-server到您的应用程序,您可以使用嵌入模式。 | 
ActiveMQ Artemis 配置由spring.artemis.*. 例如,您可以在 中声明以下部分application.properties:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secretspring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"嵌入代理时,您可以选择是否要启用持久性并列出应可用的目标。可以将它们指定为逗号分隔的列表,以使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义类型org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration或的 bean 。org.apache.activemq.artemis.jms.server.config.TopicConfiguration
默认情况下,a使用合理的设置包装CachingConnectionFactory本机,ConnectionFactory您可以通过以下中的外部配置属性来控制这些设置spring.jms.*:
spring.jms.cache.session-cache-size=5spring:
  jms:
    cache:
      session-cache-size: 5如果您更愿意使用本机池,可以通过添加依赖项org.messaginghub:pooled-jms并进行相应配置来实现JmsPoolConnectionFactory,如以下示例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50请参阅ArtemisProperties参考资料 了解更多支持的选项。
不涉及 JNDI 查找,并且使用nameActiveMQ Artemis 配置中的属性或通过配置提供的名称根据其名称解析目标。
1.3. 使用 JNDI ConnectionFactory
如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试ConnectionFactory使用 JNDI 来查找 JMS。默认情况下,会检查java:/JmsXA和位置。java:/XAConnectionFactory如果需要指定替代位置,则可以使用该spring.jms.jndi-name属性,如下例所示:
spring.jms.jndi-name=java:/MyConnectionFactoryspring:
  jms:
    jndi-name: "java:/MyConnectionFactory"1.4. 发送消息
SpringJmsTemplate是自动配置的,您可以将其直接自动装配到您自己的 bean 中,如以下示例所示:
@Component
public class MyBean {
    private final JmsTemplate jmsTemplate;
    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
| JmsMessagingTemplate可以用类似的方式注射。如果定义了一个DestinationResolver或一个MessageConverterbean,它会自动关联到自动配置的JmsTemplate. | 
1.5. 接收消息
当存在 JMS 基础设施时,可以对任何 Bean 进行注释@JmsListener以创建侦听器端点。如果JmsListenerContainerFactory未定义,则自动配置默认值。如果定义了 a DestinationResolver、 aMessageConverter或 a jakarta.jms.ExceptionListenerbeans,它们会自动与默认工厂关联。
默认情况下,默认工厂是事务性的。如果您在存在 a 的基础架构中运行JtaTransactionManager,则默认情况下它会与侦听器容器关联。如果没有,sessionTransacted则启用该标志。@Transactional在后一种情况下,您可以通过添加侦听器方法(或其委托)将本地数据存储事务与传入消息的处理关联起来。这可确保本地事务完成后传入的消息得到确认。这还包括发送在同一 JMS 会话上执行的响应消息。
以下组件在someQueue目标上创建侦听器端点:
@Component
public class MyBean {
    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }
}
@Component
class MyBean {
    @JmsListener(destination = "someQueue")
    fun processMessage(content: String?) {
        // ...
    }
}
| 有关详细信息,
请参阅Javadoc 。 @EnableJms | 
如果您需要创建更多JmsListenerContainerFactory实例或者想要覆盖默认值,Spring Boot 提供了一个实例DefaultJmsListenerContainerFactoryConfigurer,您可以使用它来初始化一个DefaultJmsListenerContainerFactory与自动配置的实例相同的设置。
例如,以下示例公开了另一个使用特定 的工厂MessageConverter:
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }
    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
    @Bean
    fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
        val factory = DefaultJmsListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }
    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }
}
然后您可以在任何带注释的方法中使用工厂,@JmsListener如下所示:
@Component
public class MyBean {
    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }
}
@Component
class MyBean {
    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }
}
2.AMQP
高级消息队列协议 (AMQP) 是面向消息中间件的平台中立、线路级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp“Starter”。
2.1. RabbitMQ 支持
RabbitMQ是一个基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。
RabbitMQ 配置由spring.rabbitmq.*. 例如,您可以在 中声明以下部分application.properties:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secretspring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"或者,您可以使用以下addresses属性配置相同的连接:
spring.rabbitmq.addresses=amqp://admin:secret@localhostspring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"| 当以这种方式指定地址时, host和port属性将被忽略。如果地址使用该amqps协议,则会自动启用 SSL 支持。 | 
请参阅 参考资料RabbitProperties了解更多受支持的基于属性的配置选项。ConnectionFactory要配置Spring AMQP 使用的RabbitMQ 的较低级别详细信息,请定义一个ConnectionFactoryCustomizerbean。
如果ConnectionNameStrategy上下文中存在 bean,它将自动用于命名由自动配置的CachingConnectionFactory.
要对 进行应用程序范围的附加定制RabbitTemplate,请使用RabbitTemplateCustomizerbean。
| 有关更多详细信息 , 请参阅了解 RabbitMQ 使用的协议 AMQP 。 | 
2.2. 发送消息
Spring 的AmqpTemplate和AmqpAdmin是自动配置的,您可以将它们直接自动装配到您自己的 bean 中,如以下示例所示:
@Component
public class MyBean {
    private final AmqpAdmin amqpAdmin;
    private final AmqpTemplate amqpTemplate;
    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
| RabbitMessagingTemplate可以用类似的方式注射。如果MessageConverter定义了一个 bean,它会自动关联到自动配置的AmqpTemplate. | 
如有必要,任何org.springframework.amqp.core.Queue定义为 bean 的对象都会自动用于在 RabbitMQ 实例上声明相应的队列。
要重试操作,您可以启用重试AmqpTemplate(例如,在代理连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2sspring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"默认情况下禁用重试。您还可以RetryTemplate通过声明RabbitRetryTemplateCustomizerbean 以编程方式自定义。
如果您需要创建更多RabbitTemplate实例或者想要覆盖默认值,Spring Boot 提供了一个RabbitTemplateConfigurerbean,您可以使用该 bean 来初始化RabbitTemplate与自动配置所使用的工厂相同的设置。
2.3. 向流发送消息
要将消息发送到特定流,请指定流的名称,如以下示例所示:
spring.rabbitmq.stream.name=my-streamspring:
  rabbitmq:
    stream:
      name: "my-stream"如果定义了MessageConverter、StreamMessageConverter或ProducerCustomizerbean,它会自动关联到自动配置的RabbitStreamTemplate.
如果您需要创建更多RabbitStreamTemplate实例或者想要覆盖默认值,Spring Boot 提供了一个RabbitStreamTemplateConfigurerbean,您可以使用该 bean 来初始化RabbitStreamTemplate与自动配置所使用的工厂相同的设置。
2.4. 接收消息
当 Rabbit 基础设施存在时,任何 bean 都可以被注释@RabbitListener以创建侦听器端点。如果RabbitListenerContainerFactory未定义,SimpleRabbitListenerContainerFactory则会自动配置默认值,您可以使用该spring.rabbitmq.listener.type属性切换到直接容器。如果定义了一个MessageConverter或一个MessageRecovererbean,它会自动与默认工厂关联。
以下示例组件在someQueue队列上创建侦听器端点:
@Component
public class MyBean {
    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }
}
@Component
class MyBean {
    @RabbitListener(queues = ["someQueue"])
    fun processMessage(content: String?) {
        // ...
    }
}
| 有关详细信息,
请参阅Javadoc 。 @EnableRabbit | 
如果您需要创建更多RabbitListenerContainerFactory实例或者想要覆盖默认值,Spring Boot 提供了 aSimpleRabbitListenerContainerFactoryConfigurer和 a DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化 aSimpleRabbitListenerContainerFactory和 a DirectRabbitListenerContainerFactory,其设置与自动配置所使用的工厂相同。
| 您选择哪种容器类型并不重要。这两个 bean 通过自动配置公开。 | 
例如,以下配置类公开了另一个使用特定的工厂MessageConverter:
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }
    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
    @Bean
    fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }
    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }
}
然后你可以在任何带注释的方法中使用工厂@RabbitListener,如下所示:
@Component
public class MyBean {
    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }
}
@Component
class MyBean {
    @RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }
}
您可以启用重试来处理侦听器引发异常的情况。默认情况下,RejectAndDontRequeueRecoverer使用 ,但您可以定义MessageRecoverer自己的。当重试次数用尽时,消息将被拒绝并被丢弃或路由到死信交换(如果代理配置为这样做)。默认情况下,重试被禁用。您还可以RetryTemplate通过声明RabbitRetryTemplateCustomizerbean 以编程方式自定义。
| 默认情况下,如果禁用重试并且侦听器引发异常,则将无限期地重试传递。 defaultRequeueRejected您可以通过两种方式修改此行为:将属性设置为false以便尝试零次重新传递,或者抛出AmqpRejectAndDontRequeueException来表示应拒绝消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。 | 
3. Apache Kafka 支持
Apache Kafka通过提供项目的自动配置来支持spring-kafka。
Kafka 配置由spring.kafka.*. 例如,您可以在 中声明以下部分application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroupspring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"| 要在启动时创建主题,请添加类型为 的 bean NewTopic。如果主题已经存在,则忽略该 bean。 | 
请参阅KafkaProperties参考资料 了解更多支持的选项。
3.1. 发送消息
SpringKafkaTemplate是自动配置的,您可以直接在自己的 bean 中自动装配它,如以下示例所示:
@Component
public class MyBean {
    private final KafkaTemplate<String, String> kafkaTemplate;
    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
| spring.kafka.producer.transaction-id-prefix如果定义了
该属性,KafkaTransactionManager则会自动配置 a。此外,如果RecordMessageConverter定义了一个 bean,它会自动关联到自动配置的KafkaTemplate. | 
3.2. 接收消息
当 Apache Kafka 基础设施存在时,可以对任何 bean 进行注释@KafkaListener以创建侦听器端点。如果未定义KafkaListenerContainerFactory,则会使用 中定义的键自动配置默认值spring.kafka.listener.*。
以下组件在someTopic主题上创建侦听器端点:
@Component
public class MyBean {
    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }
}
@Component
class MyBean {
    @KafkaListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }
}
如果KafkaTransactionManager定义了一个bean,它会自动关联到容器工厂。类似地,如果定义了RecordFilterStrategy、CommonErrorHandler或bean,它会自动关联AfterRollbackProcessor到ConsumerAwareRebalanceListener默认工厂。
根据侦听器类型,aRecordMessageConverter或BatchMessageConverterbean 会与默认工厂关联。如果RecordMessageConverter批处理侦听器仅存在一个 bean,则它将包装在BatchMessageConverter.
| ChainedKafkaTransactionManager必须标记
自定义@Primary,因为它通常引用自动配置的KafkaTransactionManagerbean。 | 
3.3. 卡夫卡流
Spring for Apache Kafka 提供了一个工厂 bean 来创建StreamsBuilder对象并管理其流的生命周期。KafkaStreamsConfiguration只要所需的beankafka-streams位于类路径中,并且通过@EnableKafkaStreams注释启用了 Kafka Streams,Spring Boot 就会自动配置所需的 bean。
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 进行配置spring.kafka.streams.application-id,如果不设置则默认为spring.application.name。后者可以全局设置或仅针对流专门覆盖。
使用专用属性可以使用几个附加属性;可以使用命名空间设置其他任意 Kafka 属性spring.kafka.streams.properties。另请参阅其他 Kafka 属性以获取更多信息。
要使用工厂 bean,请按以下示例所示连接StreamsBuilder到您的工厂 bean:@Bean
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }
    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
    @Bean
    fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
        val stream = streamsBuilder.stream<Int, String>("ks1In")
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
        return stream
    }
    private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
        return KeyValue(key, value.uppercase())
    }
}
默认情况下,对象管理的流StreamBuilder会自动启动。您可以使用spring.kafka.streams.auto-startup属性自定义此行为。
3.4. 其他 Kafka 属性
自动配置支持的属性显示在附录的“集成属性”部分中。请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 点分属性。有关详细信息,请参阅 Apache Kafka 文档。
名称中不包含客户端类型(producer、consumer、admin或)的属性被视为通用属性并适用于所有客户端。streams如果需要,可以为一种或多种客户端类型覆盖大多数常见属性。
Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。
只有 Kafka 支持的属性的子集可以直接通过该类获得KafkaProperties。如果您希望使用不直接支持的其他属性来配置各个客户端类型,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifthspring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"这会将通用prop.oneKafka 属性设置为first(适用于生产者、消费者、管理员和流),将prop.two管理属性设置为second,将prop.three消费者属性设置为third,将prop.four生产者属性设置为fourth,将prop.five流属性设置为fifth。
JsonDeserializer您还可以按如下方式配置Spring Kafka :
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.anotherspring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"同样,您可以禁用JsonSerializer在标头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=falsespring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false| 以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。 | 
3.5. 使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。@EmbeddedKafka要使用此功能,请使用from模块注释测试类spring-kafka-test。有关更多信息,请参阅 Spring for Apache Kafka参考手册。
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用,您需要将嵌入式代理地址的系统属性(由 填充)重新映射EmbeddedKafkaBroker到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:
- 
提供一个系统属性以将嵌入式代理地址映射到 spring.kafka.bootstrap-servers测试类中:
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
- 
在注释上配置属性名称 @EmbeddedKafka:
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
    // ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
    // ...
}
- 
在配置属性中使用占位符: 
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"4.RSocket
RSocket是一种用于字节流传输的二进制协议。它通过在单个连接上传递异步消息来实现对称交互模型。
Spring框架的模块spring-messaging为客户端和服务器端的RSocket请求者和响应者提供支持。有关更多详细信息,请参阅Spring 框架参考的RSocket 部分,包括 RSocket 协议的概述。
4.1. RSocket 策略自动配置
Spring Boot 自动配置一个RSocketStrategiesbean,提供编码和解码 RSocket 有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序):
- 
Jackson 的CBOR编解码器 
- 
Jackson 的 JSON 编解码器 
启动spring-boot-starter-rsocket器提供了这两个依赖项。请参阅Jackson 支持部分以了解有关自定义可能性的更多信息。
开发人员可以RSocketStrategies通过创建实现RSocketStrategiesCustomizer接口的 bean 来自定义组件。请注意,它们@Order很重要,因为它决定了编解码器的顺序。
4.2. RSocket服务器自动配置
Spring Boot 提供 RSocket 服务器自动配置。所需的依赖项由spring-boot-starter-rsocket.
Spring Boot 允许从 WebFlux 服务器通过 WebSocket 公开 RSocket,或者建立独立的 RSocket 服务器。这取决于应用程序的类型及其配置。
对于 WebFlux 应用程序(类型为WebApplicationType.REACTIVE),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocketspring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"| 仅 Reactor Netty 支持将 RSocket 插入 Web 服务器,因为 RSocket 本身就是使用该库构建的。 | 
或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。除了依赖性要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898spring:
  rsocket:
    server:
      port: 98984.3. Spring Messaging RSocket 支持
Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。
这意味着 Spring Boot 将创建一个RSocketMessageHandlerbean 来处理对应用程序的 RSocket 请求。
4.4. 使用 RSocketRequester 调用 RSocket 服务
一旦RSocket服务器和客户端之间建立了通道,任何一方都可以向另一方发送或接收请求。
RSocketRequester作为服务器,您可以在 RSocket 的任何处理程序方法上注入实例@Controller。作为客户端,您需要首先配置并建立RSocket连接。Spring Boot 会使用预期的编解码器自动配置RSocketRequester.Builder此类情况并应用任何RSocketConnectorConfigurerbean。
该RSocketRequester.Builder实例是一个原型bean,这意味着每个注入点都会为您提供一个新实例。这是有意完成的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。
下面的代码展示了一个典型的例子:
@Service
public class MyService {
    private final RSocketRequester rsocketRequester;
    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }
    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
    private val rsocketRequester: RSocketRequester
    init {
        rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
    }
    fun someRSocketCall(name: String): Mono<User> {
        return rsocketRequester.route("user").data(name).retrieveMono(
            User::class.java
        )
    }
}
5.Spring集成
Spring Boot 为使用Spring Integration提供了多种便利,包括spring-boot-starter-integration“Starter”。Spring Integration 提供了对消息传递以及其他传输(例如 HTTP、TCP 等)的抽象。如果 Spring Integration 在您的类路径上可用,它将通过@EnableIntegration注释进行初始化。
Spring Integration 轮询逻辑依赖于自动配置的TaskScheduler. 默认值PollerMetadata(每秒轮询无限数量的消息)可以使用spring.integration.poller.*配置属性进行自定义。
Spring Boot 还配置了一些由其他 Spring Integration 模块的存在触发的功能。如果spring-integration-jmx也在类路径上,则消息处理统计信息将通过 JMX 发布。如果spring-integration-jdbc可用,则可以在启动时创建默认数据库架构,如下行所示:
spring.integration.jdbc.initialize-schema=alwaysspring:
  integration:
    jdbc:
      initialize-schema: "always"如果spring-integration-rsocket可用,开发人员可以使用属性配置 RSocket 服务器"spring.rsocket.server.*",并让它使用IntegrationRSocketEndpoint或RSocketOutboundGateway组件来处理传入的 RSocket 消息。该基础设施可以处理 Spring Integration RSocket 通道适配器和@MessageMapping处理程序(已"spring.integration.rsocket.server.message-mapping-enabled"配置)。
Spring Boot 还可以使用配置属性自动配置ClientRSocketConnector:
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"有关更多详细信息,请参阅IntegrationAutoConfiguration和IntegrationProperties类。
6.WebSockets
Spring Boot 为嵌入式 Tomcat、Jetty 和 Undertow 提供 WebSockets 自动配置。如果将 war 文件部署到独立容器,Spring Boot 会假定该容器负责配置其 WebSocket 支持。
Spring框架为MVC Web应用程序提供了丰富的WebSocket支持,可以通过spring-boot-starter-websocket模块轻松访问。
WebSocket 支持也可用于反应式 Web 应用程序,并且需要同时包含 WebSocket API spring-boot-starter-webflux:
<dependency>
    <groupId>jakarta.websocket</groupId>
    <artifactId>jakarta.websocket-api</artifactId>
</dependency>