我正在尝试将 spring amqp 配置为仅重试消息定义的次数.当前一条消息失败,例如因为 DataIntegrityViolationException 被无限期重新交付.
I am trying configure spring amqp to only retry a message a defined amount of times. Currently a message that fails e.g. because of a DataIntegrityViolationException is redelivered indefinitely.
根据文档 这里我想出了以下配置
According to the documentation here I came up with the following configuration
@Bean
public StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateful()
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.maxAttempts(3)
.messageKeyGenerator(message -> UUID.randomUUID().toString())
.build();
}
这似乎没有应用 - 消息仍在无限期地尝试.
This does not seem to be applied - the messages are still tried indefinitely.
感觉我在这里错过了什么.
Feels like I am missing something here.
这是我关于 AMQP 的剩余配置:
Here is my remaining configuration regarding AMQP:
@Bean
Queue testEventSubscriberQueue() {
final boolean durable = true;
return new Queue("testEventSubscriberQueue", durable);
}
@Bean
Binding binding(TopicExchange topicExchange) {
return BindingBuilder.bind(testEventSubscriberQueue()).to(topicExchange).with("payload.event-create");
}
@Bean
SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(testEventSubscriberQueue().getName());
container.setMessageListener(listenerAdapter);
container.setChannelTransacted(true);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessageConverter messageConverter, SubscriberHandler subscriberHandler) {
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(subscriberHandler);
listenerAdapter.setMessageConverter(messageConverter);
return listenerAdapter;
}
@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
final Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
jsonMessageConverter.setJsonObjectMapper(objectMapper);
DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
defaultClassMapper.setDefaultType(EventPayload.class);
jsonMessageConverter.setClassMapper(defaultClassMapper);
final ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter(jsonMessageConverter);
messageConverter.addDelgate(MessageProperties.CONTENT_TYPE_JSON, jsonMessageConverter);
return messageConverter;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
//rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public TopicExchange testExchange() {
final boolean durable = true;
final boolean autoDelete = false;
return new TopicExchange(EXCHANGE_NAME, durable, autoDelete);
}
我正在使用 spring-amqp 1.5.1.RELEASE.
I am using spring-amqp 1.5.1.RELEASE.
感谢任何帮助.
你需要配置容器将拦截器添加到它的通知链中...
You need to configure the container to add the interceptor to its advice chain...
container.setAdviceChain(new Advice[] { statefulRetryOperationsInterceptor() });
这篇关于未使用 Spring AMQP StatefulRetryOperationsInterceptor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!
解析 ISO 8601 字符串本地日期时间,就像在 UTC 中Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期时间,就像在 UTC 中一样)
如何将公历字符串转换为公历?How to convert Gregorian string to Gregorian Calendar?(如何将公历字符串转换为公历?)
Java:GregorianCalendar 的最大值和最小值是什么/在哪Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
1582 年 10 月 15 日之前日期的日历到日期转换.公历Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日历到日期转换
java日历setFirstDayOfWeek不起作用java Calendar setFirstDayOfWeek not working(java日历setFirstDayOfWeek不起作用)
Java:获取当前星期几的值Java: getting current Day of the Week value(Java:获取当前星期几的值)