Spring Cloud(十七):消息驱动之Spring Cloud Stream 编程模型

本篇主要描述 Spring Cloud Stream 编程模型的三个核心概念:Destination Binders(目标绑定器)、Destination Bindings(目标绑定)、Message(消息),还包含错误处理。

理解这些核心概念,了解具体使用及其背后一些运行机制,可以更好的理解 Spring Cloud Stream 这款组件。

Spring Cloud Stream 官方文档Spring Cloud Stream ProjectGithub > Spring Cloud StreamGithub > Spring-Retry

要理解 Stream 编程模型,需要熟悉以下核心概念:

  • Destination Binders(目标绑定器):负责提供与外部消息传递系统(消息中间件)集成的组件。
  • Destination Bindings(目标绑定):为外部消息传递系统(消息中间件)和应用程序提供的消息生产者和消费者之间提供桥接(由目标绑定器创建)。
  • Message(消息):生产者和消费者用来与目标绑定器(以及通过外部消息系统的其他应用程序)通信的规范数据结构。

Spring Cloud Stream Programming Model

Destination Binders

目标绑定器(Destination Binders)是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现,以促进与外部消息传递系统的集成。此集成负责生产者与消费者之间的消息的连接、委派和路由、数据类型转换,用户代码的调用等。

绑定器负责处理许多样板事件,否则需要自己处理。 然而,要实现这一点,绑定器仍需要用户提供一些简约的指令集的帮助,这些指令通常以某种配置的方式提供。

Destination Bindings

如前所述,Destination Bindings 在外部消息传递系统与应用程序提供的生产者和消费者之间提供了桥接。

@EnableBinding 注解

在应用的配置类上使用 @EnableBinding 注解来指定一个或多个定义了 @Input@Output 的接口,以些实现对消息通道(Channel)的绑定。
@EnableBinding 注解本身集成了 @Configuration 注释,并触发 Spring Cloud Stream 基础结构的配置。

以下示例显示了一个可运行的 Spring Cloud Stream 应用,从 INPUT 通道接收消息,打印到控制台,并将其转换为大写后发送到 OUTPUT 通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {

public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(String value) {
System.out.println("Received: " + value);
return value.toUpperCase();
}
}

@EnableBinding 注解的参数可以是一个或多个接口类,这些接口被称为绑定,它们包含了绑定消息通道的方法(组件),这些组件通常是基于通道绑定器(如 Rabbit、Kafka 和其他)的消息通道–译:绑定消息通道的实现(绑定器)需要消息中间件提供支持。

其他类型的绑定可以通过相应的技术为原生特性提供支持。例如,Kafka Streams binder(以前称为 KStream)允许直接绑定到 Kafka Streams。

Binding Interface

Spring Cloud Stream 为常见的的消息交换提供了绑定接口,其中包括:

  • Sink(接收器):定义消费者消费消息的目标,一个可绑定的输入通道接口,INPUT 属性定义通道名,默认为 input
  • Source(发送器):定义生产者生产消息的目标,一个可绑定的输出通道接口,OUTPUT 属性定义通道名,默认为 output
  • Processor(处理器):继承了 SourceSink 接口,同时具有输入输出接口和属性。
  1. Sink 接口
1
2
3
4
5
6
7
public interface Sink {

String INPUT = "input";

@Input(Sink.INPUT)
SubscribableChannel input();
}
  1. Source 接口
1
2
3
4
5
6
7
public interface Source {

String OUTPUT = "output";

@Output(Source.OUTPUT)
MessageChannel output();
}
  1. Processor 接口
    1
    public interface Processor extends Source, Sink {}

自定义绑定接口

也可以使用 @input@output 注解来自定义绑定消息通道的接口。如下示例:

1
2
3
4
5
6
7
8
9
10
11
public interface Barista {

@Input
SubscribableChannel orders();

@Output
MessageChannel hotDrinks();

@Output
MessageChannel coldDrinks();
}

上面的示例中的接口作为 @EnableBinding 的参数,分别触发创建名为 ordershotDrinkscoldDrinks 的三个绑定通道。

@EnableBinding 可指定多个接口来绑定消息通道,如下示例:

1
@EnableBinding(value = { Orders.class, Payment.class })

在 Spring Cloud Stream 中,可绑定的 MessageChannel 组件(接口)是 Spring Messaging 提供的 MessageChannel(定义发送消息的方法) 及其扩展,SubscribableChannel(维护订阅注册表,并调用消费该通道的消息)。

可轮询目标绑定

前面描述的绑定支持是基于事件的消息消费,但有时需要更多控制,例如消耗率。从2.0版开始,可以绑定可轮询的消费者。

以下示例显示如何绑定可轮询消费者:

1
2
3
4
5
6
public interface PolledBarista {

@Input
PollableMessageSource orders();
. . .
}

在这种情况下,PollableMessageSource 的实现绑定到 orders 通道。 有关更多详细信息,可参见 Section 29.3.5, “Using Polled Consumers” 章节。

自定义通道名称

使用 @Input@Output 注解,可以自定义通道名称,如下示例:

1
2
3
4
5
public interface Barista {
//创建名为 inboundOrders 的通道。
@Input("inboundOrders")
SubscribableChannel orders();
}

通常,不需要直接访问单个通道或绑定(除了通过 @EnableBinding 注解配置它们) 。但有时也会需要访问,如测试或其它情况。

注入接口和通道发送消息

除了为每个绑定生成通道并将其注册为 Spring Bean 之外,对于每个绑定的接口,SpringCloud Stream 还生成一个实现接口的 bean。这样可以通过在应用程序中注入 Bean 来访问绑定或单个通道的接口,如下两个示例所示:

  1. 注入绑定接口

    1
    2
    3
    4
    5
    6
    @Autowire
    private Source source

    public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
    }
  2. 注入单独通道

    1
    2
    3
    4
    5
    6
    @Autowire
    private MessageChannel output;

    public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
    }

    还可以使用标准 Spring 的 @Qualifier 注解来自定义通道名称,或在多个通道场景中需要对指定的通道进行命名。如下示例:

    1
    2
    3
    @Autowire
    @Qualifier("myChannel")
    private MessageChannel output;

生产和消费消息

Producing and Consuming Messages(生产和消费消息):可以使用 Spring Integration 注解 和 Spring Cloud Stream 原生注解来编写 Spring Cloud Stream 应用程序。

Spring Integration 支持

Spring Cloud Stream 建立在 Enterprise Integration Patterns 定义的概念和模式之上,其内部实现依赖于 Spring 项目组中已经建立和流行的企业集成模式(Enterprise Integration Patterns): Spring Integration 框架。

因此,自然支持 Spring Integration 已经建立的基础,语义和配置选项。

例如,可以将 Source 的输出通道连接到 MessageSource 并使用熟悉的 @InboundChannelAdapter 注解注释,如下所示:

1
2
3
4
5
6
7
8
9
@EnableBinding(Source.class)
public class TimerSource {

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
}

同样,可以使用 @Transformer@ServiceActivator 注解,同时为处理器绑定契约提供消息处理方法的实现,如以下示例所示:

1
2
3
4
5
6
7
@EnableBinding(Processor.class)
public class TransformProcessor {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
return message.toUpperCase();
}
}

备注: 当使用 @StreamListener 注解从相同的绑定中消费时,将使用 pub-sub 模型。 每个使用 @StreamListener 注解的方法都会收到自己的消息副本,每个消息都有自己的使用者组。
但是,如果在相同的绑定中使用 Spring Integration 的注释(例如 @Aggregator,*@Transformer* 或 @ServiceActivator),则会在竞争模型中消费消息,不会为每个订阅创建单个消费者组。

@StreamListener 注解使用

作为 Spring Integration 支持的补充,Spring Cloud Stream 提供了自己的 @StreamListener 注释,参考了其他 Spring Messaging 注解(*@MessageMapping@JamsListener@RabbitListener* 等),并提供了非常方便的功能,如基于内容的路由等。

@StreamListener 注解定义在方法主,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器。注解中的属性直对应监听的消息通道名。

1
2
3
4
5
6
7
8
9
10
11
@EnableBinding(Sink.class)
public class VoteHandler {

@Autowired
VotingService votingService;

@StreamListener(Sink.INPUT)//默认通道名为 input
public void handle(Vote vote) {
votingService.record(vote);
}
}

与其他 Spring Messaging 方法一样,方法参数可以使用 @Payload,*@Headers*和 @Header 进行注释。

若要将监听接收到的数据返回到输出目标,可以使用 @SendTo 注解指定目标来输出,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(Processor.class)
public class TransformProcessor {

@Autowired
VotingService votingService;

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}

@StreamListener 内容路由

Spring Cloud Stream 支持根据条件将消息分派给使用 @StreamListener 注解的多个处理程序方法。为了支持条件调度,方法必须满足以下条件:

  • 不能返回值。
  • 必须是单独的消息处理方法(不支持反应式 API 方法)。

条件由注解的 condition 参数中的 SpEL 表达式指定,并对每条消息进行评估。与条件匹配的所有处理都在同一个线程中调用,并且不必假设调用的顺序。

在以下带有调度条件的 @StreamListener 示例中,头类型为 bogey 的所有消息都将被调度到 receiveBogey 方法,并且所有头类型为 bacall 的消息都将被调度到 receiveBacall 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
// handle the message
}

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
public void receiveBacall(@Payload BacallPojo bacallPojo) {
// handle the message
}
}

Spring Cloud Function 支持

从 Spring Cloud Stream v2.1 开始,定义 流处理器 的另一种选择是使用 Spring Cloud Function的内置支持,它们可以表示为 java.util.function.[Supplier/Function/Consumer] 类型的 bean。

要指定绑定到 bindings 所公开的外部目标的功能 bean,必须提供 spring.cloud.stream.function.definition 属性。如下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyFunctionBootApp {

public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class, "--spring.cloud.stream.function.definition=toUpperCase");
}

@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}

上面代码,简单地定义了一个名为 toUpperCasejava.util.function.Function 类型的 bean,用作消息处理程序器,其 inputoutput 必须绑定到由 Processor 绑定的公开的外部目标。

下面这些示例是支持 Source,ProcessorSink 的简单功能应用程序的示例。

以下是定义为 java.util.function.SupplierSource 应用示例

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
public static void main(String[] args) {
SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
}
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}

以下是定义为 java.util.function.FunctionProcessor 应用示例

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
@EnableBinding(Processor.class)
public static class ProcessorFromFunction {
public static void main(String[] args) {
SpringApplication.run(ProcessorFromFunction.class, "--spring.cloud.stream.function.definition=toUpperCase");
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}

以下是定义为 java.util.function.ConsumerSink 应用示例

1
2
3
4
5
6
7
8
9
10
11
@EnableAutoConfiguration
@EnableBinding(Sink.class)
public static class SinkFromConsumer {
public static void main(String[] args) {
SpringApplication.run(SinkFromConsumer.class, "--spring.cloud.stream.function.definition=sink");
}
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}

功能组合:
使用此编程模型,可以从功能组合中获益,可以将一组简单的功能动态组合成复杂的处理器。 示例:

1
2
3
4
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}

并修改 spring.cloud.stream.function.definition 属性以反映将 toUpperCasewrapInQuotes 组合成一个新的功能。为此 Spring Cloud Function 允许用 | (管道) 符号。
修改后的属性如下:

1
-- spring.cloud.stream.function.definition=toUpperCase|wrapInQuotes

使用消费者轮询

使用轮询的消费者时,可以根据需要轮询 PollableMessageSource,如下示例:

1
2
3
4
5
6
7
8
9
public interface PolledConsumer {

@Input
PollableMessageSource destIn();

@Output
MessageChannel destOut();

}

对于上面示例中的轮询消费者,可以按如下方式使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}

上面示例,PollableMessageSource.poll() 方法接受 MessageHandler 参数,如果收到并成功处理了消息,则返回 true。

与消息驱动的消费者一样,如果 MessageHandler 抛出异常,则将消息发布到 错误通道。通常,poll() 方法在 MessageHandler 退出时确认消息。 如果方法异常退出,则拒绝该消息(不重新排队),可以自定义确认后的处理,如以下示例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)

})) {
Thread.sleep(1000);
}
}
};
}

重要:必须在某个时刻 acknack(确认 or 无应答) 消息,以避免资源泄漏。

重要:某些消息中间件(例如 Kafka) 在日志中维护一个简单的偏移量。如果传递失败并使用 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE) 重新排队,则会重新传递任何以后成功获取的消息。

还有一个重载的poll方法,其定义如下:

1
poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是转换提示,允许转换传入的消息有效内容,如以下示例所示:

1
2
3
4
5
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...

}, new ParameterizedTypeReference<Map<String, Foo>>() {});

错误处理:

默认情况下,为轮询资源配置了一个 错误通道,如果回调引发异常会向错误通道 (<destination>.<group>.errors) 发送一条 ErrorMessage。此错误通道会桥接到全局的 Spring Integration errorChannel。

还可以使用 @ServiceActivator 订阅错误通道来处理错误;如果没有订阅,则只会记录错误并确认消息成功。如果错误通道 service activator 抛出异常,则消息将被拒绝(默认情况下),并且不会重新传递该消息。

如果 service activator 抛出 RequeueCurrentMessageException 异常,则消息将在代理处重新排队,并将在后续轮询中再次检索。

如果 listener 直接抛出 RequeueCurrentMessageException,则消息将被重新排队,不会被发送到错误通道。

错误处理

发生错误,Spring Cloud Stream 提供了灵活的机制来处理它们,主要有两种形式:

  • application(应用层):错误处理在应用内完成(即自定义错误处理)。
  • system(系统层):将错误处理委托给绑定器(重新排队,DL 或其他)。注意,这些技术取决于绑定器实现和底层消息中间件的功能。

Spring Cloud Stream 使用 Spring Retry 库来促进成功的消息处理。当全部失败时,消息处理程序抛出的异常将传播回绑定器。 此时,Binder 调用自定义错误处理程序或将错误传回消息中间件(重新队列,DLQ和其他)。

应用错误处理

应用级错误处理有两种类型,可以在每个绑定订阅处理错误,或者全局处理器可以处理所有绑定订阅错误。 如下图:

Spring Cloud Stream Sink Application with Custom and Global Error Handlers

对于每个输入绑定,Spring Cloud Stream 会使用 <destinationName>.errors 创建专用的错误通道。

<destinationName> 由绑定的名称(如 input) 和组名称(如 myGroup)组成。

如下示例:

1
spring.cloud.stream.bindings.input.group=myGroup
1
2
3
4
5
6
7
8
9
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(Person value) {
throw new RuntimeException("BOOM!");
}

@ServiceActivator(inputChannel = Processor.INPUT + ".myGroup.errors") //channel name 'input.myGroup.errors'
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}

上面的示例,目标名是 input.myGroup,处理错误的通道名是 input.myGroup.errors

注意:*@StreamListener* 注解专门用于定义桥接内部通道和外部目标的绑定。 因为目标特定错误通道没有关联的外部目标,此类通道是 Spring Integration(SI)的特权,意味着必须使用 SI 处理器注解之一来定义此类目标的处理程序(即 @ServiceActivator,@Transformer等)。

注意:如果未指定 Group,则使用匿名组(类似于 input.anonymous.2K37rb06Q6m2r51-SPIDDQ ),但这种不适合错误处理场景,因为在创建目标之前不知道它是什么。

此外,如果绑定现有目标,如下:

1
2
spring.cloud.stream.bindings.input.destination=myFooDestination
spring.cloud.stream.bindings.input.group=myGroup

则完整的目标名称是 myFooDestination.mygroup,专用的错误通道名称是 myFooDestination.mygroup.errors

回到上面例子:

订阅了名为 input 的通道的 handle(..) 方法抛出异常。 如果还存在对错误通道 input.myGroup.errors 的订阅,此订阅将处理所有错误消息。如果有多个绑定,则可能需要一个错误处理器。Spring Cloud Stream 通过将每个错误通道桥接到名为errorChannel 的通道,自动为全局错误通道提供支持,允许单个订阅处理所有错误,如以下示例所示:

1
2
3
4
@StreamListener("errorChannel")
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}

如果错误处理逻辑相同,无论哪个处理器产生的错误,这都会是一个方便的选项。

系统错误处理

系统级错误处理意味着将错误传递回消息中间件,并且考虑到每个消息中间件并非都相同,则功能可能因绑定器而异。

也就是说,在本节中,将解释系统级错误处理背后的一般思想,并以 Rabbit binder 为例。注意:Kafka binder 提供了类似的支持,虽然某些配置属性有所不同。

如果未配置内部错误处理器,则错误会传播到绑定器,然后绑定器会将这些错误传播回消息中间件。根据消息中间件所支持的功能,可能会丢弃消息,重新排队消息以进行重新处理或将失败的消息发送到 DLQ。 Rabbit 和 Kafka 都支持这些概念。但是,其他绑定器可能不会。

Drop Failed Messages(丢弃消息)

默认情况下,如果未提供其他系统级配置,则消息中间件将丢弃失败的消息,少数情况下可接受,但在大多数情况下是不可接受的,就需要一些恢复机制来避免消息丢失。

DLQ - Dead Letter Queue(死信队列)

DLQ 允许失败的消息发送到特定的目标:- Dead Letter Queue(死信队列)。

有配置后,失败的消息将发送到此目标,以便后续重新处理或审核和协调。

例如,继续上一个示例并使用 Rabbit binder 设置DLQ,您需要设置以下属性:

1
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true

注意,上面属性中,input 对应输入目上标绑定的名称。consumer 指示这示一个消费者属性,auto-bind-dlq 指示绑定器为输入目标配置 DLQ,这将创建一个名为 input.myGroup.dlq 的 Rabbit 队列。

配置了死信队列后,所有失败消息将被路由到此队列,错误消息类似于:

1
2
3
4
5
6
7
8
9
10
delivery_mode:	1
headers:
x-death:
count: 1
reason: rejected
queue: input.hello
time: 1522328151
exchange:
routing-keys: input.myGroup
Payload {"name”:"Bob"}

从上面内容可以看出,原始消息将被保存以供进一步操作。但这些内容对于消息处理的原始问题的信息是有限的,例如,无法看到与原始错误对应的堆栈跟踪。 要获取有关原始错误的更多相关信息,您必须设置其他属性:

1
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true

添加了上面配置后,会强制内部错误处理器拦截错误信息,并在将其发布到 DLQ 之前向其添加附加信息。如下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
delivery_mode:	2
headers:
x-original-exchange:
x-exception-message: has an error
x-original-routingKey: input.myGroup
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload {"name”:"Bob"}

这样就有效地结合了应用级和系统级错误处理,为下游故障排除机制提供帮助。

Re - queue Failed Messages(失败重新排队)

目前支持的绑定器(RabbitMq 和 Kafka)依赖于 RetryTemplate 来成功的消息处理。但于,对于 max-attempts 属性设置为 1 的情况,将禁用内部重新处理。此时,可以配置消息中间件对失败的消息进行重新排队以便重新处理(重新尝试),重新排队后,失败的消息将被发送回原始处理器,实质上是创建重试循环。

该选项对于因某些原因导致消息在短期不可用的情况是可行的,要实现此操作,需要做如下配置:

1
2
spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

在上面的配置示例中,max-attempts 设置为 1 禁用了内部重试,requeue-rejected(重新排队被拒绝的消息)设置为true。 一旦这样设置,失败的消息将重新提交到同一个处理器并连续循环或直到处理程序抛出AmqpRejectAndDontRequeueException,本质上允许在处理器本身内构建自己的重试逻辑。

Retry Template(重试模板)

RetryTemplate 是 Spring Retry 库的一部分,与 RetryTemplate 特别相关的消费者属性如下:

  1. maxAttempts:默认值 3。重试处理消息次数,包含第 1 次,设置为 1 时表示不重试。
  2. backOffInitialInterval:默认值 1000 毫秒。重试时回退初始间隔。
  3. backOffMaxInterval:默认值 1000 毫秒。重试最大回退间隔。
  4. backOffMultiplier:默认值 2.0。重试回限乘数。
  5. defaultRetryable:默认值 true。监听器抛出未在 retryableExceptions 中的异常是否重试。
  6. retryableExceptions:默认值 空。一个 Map,存放 Throwable 类名和布尔值,指定发生这些异常(包括子类)是否重试。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

这些属性位于 spring-cloud-stream 包下的 org.springframework.cloud.stream.binder.ConsumerProperties 消费者属性文件中。

虽然前面的设置足以满足大多数自定义要求,但它们可能无法满足某些复杂要求,则需要自定义 RetryTemplate 实例并注册为 Bean。 此外,为了避免冲突,并在 RetryTemplate 实例上使用 @StreamRetryTemplate 注解标识由绑定器使用。如下示例:

1
2
3
4
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}

上面创建 RetryTemplate 实例,不需要使用 @Bean 注解,因为已经包含在 @StreamRetryTemplate 注解中了。

Spring Cloud(十七):消息驱动之Spring Cloud Stream 编程模型

http://blog.gxitsky.com/2019/05/13/SpringCloud-17-stream-programming-model/

作者

光星

发布于

2019-05-13

更新于

2022-06-17

许可协议

评论