Spring 框架为与消息系统集成提供了广泛的支持,从 JMS API 的简化使用JmsTemplate到异步接收消息的完整基础设施。Spring AMQP 为高级消息队列协议提供了类似的功能集。RabbitTemplateSpring Boot 还为RabbitMQ提供自动配置选项。Spring WebSocket 本身包含对 STOMP 消息传递的支持,而 Spring Boot 通过启动器和少量自动配置对此提供支持。Spring Boot 还支持 Apache Kafka。

1.JMS

该接口提供了创建与 JMS 代理交互jakarta.jms.ConnectionFactory的标准方法。jakarta.jms.Connection尽管 Spring 需要ConnectionFactory使用 JMS,但您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。(有关详细信息,请参阅 Spring 框架参考文档的相关部分。)Spring Boot 还会自动配置发送和接收消息所需的基础设施。

1.1. ActiveMQ“经典”支持

ActiveMQ“Classic”在类路径上可用时,Spring Boot 可以配置一个ConnectionFactory.

如果您使用spring-boot-starter-activemq,则会提供连接到 ActiveMQ“经典”实例所需的依赖项,以及与 JMS 集成的 Spring 基础结构。

ActiveMQ“经典”配置由spring.activemq.*. 默认情况下,ActiveMQ“经典”自动配置为使用TCP 传输,默认连接到tcp://localhost:61616。以下示例显示如何更改默认代理 URL:

Properties
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
Yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

默认情况下,a使用合理的设置包装CachingConnectionFactory本机,ConnectionFactory您可以通过以下中的外部配置属性来控制这些设置spring.jms.*

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更愿意使用本机池,可以通过添加依赖项org.messaginghub:pooled-jms并进行相应配置来实现JmsPoolConnectionFactory,如以下示例所示:

Properties
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
请参阅ActiveMQProperties参考资料 了解更多支持的选项。您还可以注册任意数量的 Bean 来实现ActiveMQConnectionFactoryCustomizer更高级的自定义。

默认情况下,ActiveMQ“经典”会创建一个目标(如果目标尚不存在),以便根据提供的名称解析目标。

1.2. ActiveMQ Artemis 支持

ConnectionFactory当 Spring Boot 检测到ActiveMQ Artemis在类路径上可用时,它可以自动配置 a 。如果代理存在,则会自动启动和配置嵌入式代理(除非已显式设置模式属性)。支持的模式是embedded(明确需要嵌入式代理,并且如果代理在类路径上不可用,则会发生错误)和native(使用传输协议连接到代理netty)。配置后者后,Spring Boot 将配置一个ConnectionFactory连接到使用默认设置在本地计算机上运行的代理的 。

如果您使用spring-boot-starter-artemis,则会提供连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础设施。通过添加org.apache.activemq:artemis-jakarta-server到您的应用程序,您可以使用嵌入模式。

ActiveMQ Artemis 配置由spring.artemis.*. 例如,您可以在 中声明以下部分application.properties

Properties
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
Yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

嵌入代理时,您可以选择是否要启用持久性并列出应可用的目标。可以将它们指定为逗号分隔的列表,以使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义类型org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration或的 bean 。org.apache.activemq.artemis.jms.server.config.TopicConfiguration

默认情况下,a使用合理的设置包装CachingConnectionFactory本机,ConnectionFactory您可以通过以下中的外部配置属性来控制这些设置spring.jms.*

Properties
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更愿意使用本机池,可以通过添加依赖项org.messaginghub:pooled-jms并进行相应配置来实现JmsPoolConnectionFactory,如以下示例所示:

Properties
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
Yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

请参阅ArtemisProperties参考资料 了解更多支持的选项。

不涉及 JNDI 查找,并且使用nameActiveMQ Artemis 配置中的属性或通过配置提供的名称根据其名称解析目标。

1.3. 使用 JNDI ConnectionFactory

如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试ConnectionFactory使用 JNDI 来查找 JMS。默认情况下,会检查java:/JmsXA和位置。java:/XAConnectionFactory如果需要指定替代位置,则可以使用该spring.jms.jndi-name属性,如下例所示:

Properties
spring.jms.jndi-name=java:/MyConnectionFactory
Yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

1.4. 发送消息

SpringJmsTemplate是自动配置的,您可以将其直接自动装配到您自己的 bean 中,如以下示例所示:

Java
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

    // ...

    fun someMethod() {
        jmsTemplate.convertAndSend("hello")
    }

}
JmsMessagingTemplate可以用类似的方式注射。如果定义了一个DestinationResolver或一个MessageConverterbean,它会自动关联到自动配置的JmsTemplate.

1.5. 接收消息

当存在 JMS 基础设施时,可以对任何 Bean 进行注释@JmsListener以创建侦听器端点。如果JmsListenerContainerFactory未定义,则自动配置默认值。如果定义了 a DestinationResolver、 aMessageConverter或 a jakarta.jms.ExceptionListenerbeans,它们会自动与默认工厂关联。

默认情况下,默认工厂是事务性的。如果您在存在 a 的基础架构中运行JtaTransactionManager,则默认情况下它会与侦听器容器关联。如果没有,sessionTransacted则启用该标志。@Transactional在后一种情况下,您可以通过添加侦听器方法(或其委托)将本地数据存储事务与传入消息的处理关联起来。这可确保本地事务完成后传入的消息得到确认。这还包括发送在同一 JMS 会话上执行的响应消息。

以下组件在someQueue目标上创建侦听器端点:

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue")
    fun processMessage(content: String?) {
        // ...
    }

}
有关详细信息, 请参阅Javadoc 。@EnableJms

如果您需要创建更多JmsListenerContainerFactory实例或者想要覆盖默认值,Spring Boot 提供了一个实例DefaultJmsListenerContainerFactoryConfigurer,您可以使用它来初始化一个DefaultJmsListenerContainerFactory与自动配置的实例相同的设置。

例如,以下示例公开了另一个使用特定 的工厂MessageConverter

Java
import jakarta.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import jakarta.jms.ConnectionFactory
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

    @Bean
    fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
        val factory = DefaultJmsListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后您可以在任何带注释的方法中使用工厂,@JmsListener如下所示:

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

2.AMQP

高级消息队列协议 (AMQP) 是面向消息中间件的平台中立、线路级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp“Starter”。

2.1. RabbitMQ 支持

RabbitMQ是一个基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。

RabbitMQ 配置由spring.rabbitmq.*. 例如,您可以在 中声明以下部分application.properties

Properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
Yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用以下addresses属性配置相同的连接:

Properties
spring.rabbitmq.addresses=amqp://admin:secret@localhost
Yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
当以这种方式指定地址时,hostport属性将被忽略。如果地址使用该amqps协议,则会自动启用 SSL 支持。

请参阅 参考资料RabbitProperties了解更多受支持的基于属性的配置选项。ConnectionFactory要配置Spring AMQP 使用的RabbitMQ 的较低级别详细信息,请定义一个ConnectionFactoryCustomizerbean。

如果ConnectionNameStrategy上下文中存在 bean,它将自动用于命名由自动配置的CachingConnectionFactory.

要对 进行应用程序范围的附加定制RabbitTemplate,请使用RabbitTemplateCustomizerbean。

有关更多详细信息 , 请参阅了解 RabbitMQ 使用的协议 AMQP 。

2.2. 发送消息

Spring 的AmqpTemplateAmqpAdmin是自动配置的,您可以将它们直接自动装配到您自己的 bean 中,如以下示例所示:

Java
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

    // ...

    fun someMethod() {
        amqpAdmin.getQueueInfo("someQueue")
    }

    fun someOtherMethod() {
        amqpTemplate.convertAndSend("hello")
    }

}
RabbitMessagingTemplate可以用类似的方式注射。如果MessageConverter定义了一个 bean,它会自动关联到自动配置的AmqpTemplate.

如有必要,任何org.springframework.amqp.core.Queue定义为 bean 的对象都会自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,您可以启用重试AmqpTemplate(例如,在代理连接丢失的情况下):

Properties
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
Yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下禁用重试。您还可以RetryTemplate通过声明RabbitRetryTemplateCustomizerbean 以编程方式自定义。

如果您需要创建更多RabbitTemplate实例或者想要覆盖默认值,Spring Boot 提供了一个RabbitTemplateConfigurerbean,您可以使用该 bean 来初始化RabbitTemplate与自动配置所使用的工厂相同的设置。

2.3. 向流发送消息

要将消息发送到特定流,请指定流的名称,如以下示例所示:

Properties
spring.rabbitmq.stream.name=my-stream
Yaml
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了MessageConverterStreamMessageConverterProducerCustomizerbean,它会自动关联到自动配置的RabbitStreamTemplate.

如果您需要创建更多RabbitStreamTemplate实例或者想要覆盖默认值,Spring Boot 提供了一个RabbitStreamTemplateConfigurerbean,您可以使用该 bean 来初始化RabbitStreamTemplate与自动配置所使用的工厂相同的设置。

2.4. 接收消息

当 Rabbit 基础设施存在时,任何 bean 都可以被注释@RabbitListener以创建侦听器端点。如果RabbitListenerContainerFactory未定义,SimpleRabbitListenerContainerFactory则会自动配置默认值,您可以使用该spring.rabbitmq.listener.type属性切换到直接容器。如果定义了一个MessageConverter或一个MessageRecovererbean,它会自动与默认工厂关联。

以下示例组件在someQueue队列上创建侦听器端点:

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"])
    fun processMessage(content: String?) {
        // ...
    }

}
有关详细信息, 请参阅Javadoc 。@EnableRabbit

如果您需要创建更多RabbitListenerContainerFactory实例或者想要覆盖默认值,Spring Boot 提供了 aSimpleRabbitListenerContainerFactoryConfigurer和 a DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来初始化 aSimpleRabbitListenerContainerFactory和 a DirectRabbitListenerContainerFactory,其设置与自动配置所使用的工厂相同。

您选择哪种容器类型并不重要。这两个 bean 通过自动配置公开。

例如,以下配置类公开了另一个使用特定的工厂MessageConverter

Java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

    @Bean
    fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后你可以在任何带注释的方法中使用工厂@RabbitListener,如下所示:

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

您可以启用重试来处理侦听器引发异常的情况。默认情况下,RejectAndDontRequeueRecoverer使用 ,但您可以定义MessageRecoverer自己的。当重试次数用尽时,消息将被拒绝并被丢弃或路由到死信交换(如果代理配置为这样做)。默认情况下,重试被禁用。您还可以RetryTemplate通过声明RabbitRetryTemplateCustomizerbean 以编程方式自定义。

默认情况下,如果禁用重试并且侦听器引发异常,则将无限期地重试传递。defaultRequeueRejected您可以通过两种方式修改此行为:将属性设置为false以便尝试零次重新传递,或者抛出AmqpRejectAndDontRequeueException来表示应拒绝消息。后者是启用重试并且达到最大传递尝试次数时使用的机制。

3. Apache Kafka 支持

Apache Kafka通过提供项目的自动配置来支持spring-kafka

Kafka 配置由spring.kafka.*. 例如,您可以在 中声明以下部分application.properties

Properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在启动时创建主题,请添加类型为 的 bean NewTopic。如果主题已经存在,则忽略该 bean。

请参阅KafkaProperties参考资料 了解更多支持的选项。

3.1. 发送消息

SpringKafkaTemplate是自动配置的,您可以直接在自己的 bean 中自动装配它,如以下示例所示:

Java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

    // ...

    fun someMethod() {
        kafkaTemplate.send("someTopic", "Hello")
    }

}
spring.kafka.producer.transaction-id-prefix如果定义了 该属性,KafkaTransactionManager则会自动配置 a。此外,如果RecordMessageConverter定义了一个 bean,它会自动关联到自动配置的KafkaTemplate.

3.2. 接收消息

当 Apache Kafka 基础设施存在时,可以对任何 bean 进行注释@KafkaListener以创建侦听器端点。如果未定义KafkaListenerContainerFactory,则会使用 中定义的键自动配置默认值spring.kafka.listener.*

以下组件在someTopic主题上创建侦听器端点:

Java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @KafkaListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

如果KafkaTransactionManager定义了一个bean,它会自动关联到容器工厂。类似地,如果定义了RecordFilterStrategyCommonErrorHandler或bean,它会自动关联AfterRollbackProcessorConsumerAwareRebalanceListener默认工厂。

根据侦听器类型,aRecordMessageConverterBatchMessageConverterbean 会与默认工厂关联。如果RecordMessageConverter批处理侦听器仅存在一个 bean,则它将包装在BatchMessageConverter.

ChainedKafkaTransactionManager必须标记 自定义@Primary,因为它通常引用自动配置的KafkaTransactionManagerbean。

3.3. 卡夫卡流

Spring for Apache Kafka 提供了一个工厂 bean 来创建StreamsBuilder对象并管理其流的生命周期。KafkaStreamsConfiguration只要所需的beankafka-streams位于类路径中,并且通过@EnableKafkaStreams注释启用了 Kafka Streams,Spring Boot 就会自动配置所需的 bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 进行配置spring.kafka.streams.application-id,如果不设置则默认为spring.application.name。后者可以全局设置或仅针对流专门覆盖。

使用专用属性可以使用几个附加属性;可以使用命名空间设置其他任意 Kafka 属性spring.kafka.streams.properties。另请参阅其他 Kafka 属性以获取更多信息。

要使用工厂 bean,请按以下示例所示连接StreamsBuilder到您的工厂 bean:@Bean

Java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
Kotlin
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

    @Bean
    fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
        val stream = streamsBuilder.stream<Int, String>("ks1In")
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
        return stream
    }

    private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
        return KeyValue(key, value.uppercase())
    }

}

默认情况下,对象管理的流StreamBuilder会自动启动。您可以使用spring.kafka.streams.auto-startup属性自定义此行为。

3.4. 其他 Kafka 属性

自动配置支持的属性显示在附录的“集成属性”部分中。请注意,在大多数情况下,这些属性(连字符或驼峰命名法)直接映射到 Apache Kafka 点分属性。有关详细信息,请参阅 Apache Kafka 文档。

名称中不包含客户端类型(producerconsumeradmin或)的属性被视为通用属性并适用于所有客户端。streams如果需要,可以为一种或多种客户端类型覆盖大多数常见属性。

Apache Kafka 将属性的重要性指定为“高”、“中”或“低”。Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低属性以及任何没有默认值的属性。

只有 Kafka 支持的属性的子集可以直接通过该类获得KafkaProperties。如果您希望使用不直接支持的其他属性来配置各个客户端类型,请使用以下属性:

Properties
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
Yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将通用prop.oneKafka 属性设置为first(适用于生产者、消费者、管理员和流),将prop.two管理属性设置为second,将prop.three消费者属性设置为third,将prop.four生产者属性设置为fourth,将prop.five流属性设置为fifth

JsonDeserializer您还可以按如下方式配置Spring Kafka :

Properties
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
Yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用JsonSerializer在标头中发送类型信息的默认行为:

Properties
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。

3.5. 使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。@EmbeddedKafka要使用此功能,请使用from模块注释测试类spring-kafka-test。有关更多信息,请参阅 Spring for Apache Kafka参考手册

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用,您需要将嵌入式代理地址的系统属性(由 填充)重新映射EmbeddedKafkaBroker到 Apache Kafka 的 Spring Boot 配置属性。有几种方法可以做到这一点:

  • 提供一个系统属性以将嵌入式代理地址映射到spring.kafka.bootstrap-servers测试类中:

Java
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Kotlin
init {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
  • 在注释上配置属性名称@EmbeddedKafka

Java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
Kotlin
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
  • 在配置属性中使用占位符:

Properties
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

4.RSocket

RSocket是一种用于字节流传输的二进制协议。它通过在单个连接上传递异步消息来实现对称交互模型。

Spring框架的模块spring-messaging为客户端和服务器端的RSocket请求者和响应者提供支持。有关更多详细信息,请参阅Spring 框架参考的RSocket 部分,包括 RSocket 协议的概述。

4.1. RSocket 策略自动配置

Spring Boot 自动配置一个RSocketStrategiesbean,提供编码和解码 RSocket 有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序):

  1. Jackson 的CBOR编解码器

  2. Jackson 的 JSON 编解码器

启动spring-boot-starter-rsocket器提供了这两个依赖项。请参阅Jackson 支持部分以了解有关自定义可能性的更多信息。

开发人员可以RSocketStrategies通过创建实现RSocketStrategiesCustomizer接口的 bean 来自定义组件。请注意,它们@Order很重要,因为它决定了编解码器的顺序。

4.2. RSocket服务器自动配置

Spring Boot 提供 RSocket 服务器自动配置。所需的依赖项由spring-boot-starter-rsocket.

Spring Boot 允许从 WebFlux 服务器通过 WebSocket 公开 RSocket,或者建立独立的 RSocket 服务器。这取决于应用程序的类型及其配置。

对于 WebFlux 应用程序(类型为WebApplicationType.REACTIVE),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:

Properties
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
仅 Reactor Netty 支持将 RSocket 插入 Web 服务器,因为 RSocket 本身就是使用该库构建的。

或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。除了依赖性要求之外,唯一需要的配置是为该服务器定义一个端口:

Properties
spring.rsocket.server.port=9898
Yaml
spring:
  rsocket:
    server:
      port: 9898

4.3. Spring Messaging RSocket 支持

Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。

这意味着 Spring Boot 将创建一个RSocketMessageHandlerbean 来处理对应用程序的 RSocket 请求。

4.4. 使用 RSocketRequester 调用 RSocket 服务

一旦RSocket服务器和客户端之间建立了通道,任何一方都可以向另一方发送或接收请求。

RSocketRequester作为服务器,您可以在 RSocket 的任何处理程序方法上注入实例@Controller。作为客户端,您需要首先配置并建立RSocket连接。Spring Boot 会使用预期的编解码器自动配置RSocketRequester.Builder此类情况并应用任何RSocketConnectorConfigurerbean。

RSocketRequester.Builder实例是一个原型bean,这意味着每个注入点都会为您提供一个新实例。这是有意完成的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。

下面的代码展示了一个典型的例子:

Java
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}
Kotlin
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {

    private val rsocketRequester: RSocketRequester

    init {
        rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
    }

    fun someRSocketCall(name: String): Mono<User> {
        return rsocketRequester.route("user").data(name).retrieveMono(
            User::class.java
        )
    }

}

5.Spring集成

Spring Boot 为使用Spring Integration提供了多种便利,包括spring-boot-starter-integration“Starter”。Spring Integration 提供了对消息传递以及其他传输(例如 HTTP、TCP 等)的抽象。如果 Spring Integration 在您的类路径上可用,它将通过@EnableIntegration注释进行初始化。

Spring Integration 轮询逻辑依赖于自动配置的TaskScheduler. 默认值PollerMetadata(每秒轮询无限数量的消息)可以使用spring.integration.poller.*配置属性进行自定义。

Spring Boot 还配置了一些由其他 Spring Integration 模块的存在触发的功能。如果spring-integration-jmx也在类路径上,则消息处理统计信息将通过 JMX 发布。如果spring-integration-jdbc可用,则可以在启动时创建默认数据库架构,如下行所示:

Properties
spring.integration.jdbc.initialize-schema=always
Yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

如果spring-integration-rsocket可用,开发人员可以使用属性配置 RSocket 服务器"spring.rsocket.server.*",并让它使用IntegrationRSocketEndpointRSocketOutboundGateway组件来处理传入的 RSocket 消息。该基础设施可以处理 Spring Integration RSocket 通道适配器和@MessageMapping处理程序(已"spring.integration.rsocket.server.message-mapping-enabled"配置)。

Spring Boot 还可以使用配置属性自动配置ClientRSocketConnector

Properties
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
Yaml
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
Properties
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
Yaml
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

有关更多详细信息,请参阅IntegrationAutoConfigurationIntegrationProperties类。

6.WebSockets

Spring Boot 为嵌入式 Tomcat、Jetty 和 Undertow 提供 WebSockets 自动配置。如果将 war 文件部署到独立容器,Spring Boot 会假定该容器负责配置其 WebSocket 支持。

Spring框架为MVC Web应用程序提供了丰富的WebSocket支持,可以通过spring-boot-starter-websocket模块轻松访问。

WebSocket 支持也可用于反应式 Web 应用程序,并且需要同时包含 WebSocket API spring-boot-starter-webflux

<dependency>
    <groupId>jakarta.websocket</groupId>
    <artifactId>jakarta.websocket-api</artifactId>
</dependency>

7. 接下来读什么

下一节将介绍如何在应用程序中启用IO 功能。您可以在本节中阅读有关缓存邮件验证休息客户端等的内容。