Spring RabbitMQ MessageRejectedWhileStoppingException: Message listener container was stopping when a message was received
I'm using Spring RabbitMQ with DirectMessageListenerContainer and encountering a MessageRejectedWhileStoppingException
when my application shuts down. Here's my configuration:
@Configuration
public class RabbitConfig {
@Bean
public DirectMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter) {
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my-queue");
container.setMessageListener(messageListenerAdapter);
container.setConsumersPerQueue(2);
container.setPrefetchCount(1);
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter(MyMessageHandler handler) {
return new MessageListenerAdapter(handler, "handleMessage");
}
}
During application shutdown, I see this error:
org.springframework.amqp.rabbit.listener.exception.MessageRejectedWhileStoppingException: Message listener container was stopping when a message was received
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListenerAndHandleException(AbstractMessageListenerContainer.java:1532)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.lambda$executeListener$8(AbstractMessageListenerContainer.java:1518)
How can I prevent this exception during graceful shutdown?
Solution
The MessageRejectedWhileStoppingException
occurs when messages arrive while the DirectMessageListenerContainer is in the process of stopping. This is a common issue during application shutdown. Here are the solutions:
1. Configure graceful shutdown timeout
The most effective approach is to configure a proper shutdown timeout that allows the container to finish processing current messages:
@Configuration
public class RabbitConfig {
@Bean
public DirectMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter) {
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("my-queue");
container.setMessageListener(messageListenerAdapter);
container.setConsumersPerQueue(2);
container.setPrefetchCount(1);
// Configure graceful shutdown
container.setShutdownTimeout(30000); // 30 seconds
container.setForceCloseChannel(false);
return container;
}
}
2. Add shutdown hook configuration
You can also configure the shutdown behavior in your application properties:
# application.yml
spring:
rabbitmq:
listener:
direct:
shutdown-timeout: 30s
force-close-channel: false
3. Implement proper message acknowledgment
Ensure your message handler properly acknowledges messages to prevent them from being requeued during shutdown:
@Component
public class MyMessageHandler {
public void handleMessage(Message message, Channel channel) {
try {
// Process your message
processMessage(message);
// Acknowledge the message
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
try {
// Reject and don't requeue if there's an error
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException ex) {
// Handle acknowledgment error
}
}
}
}
4. Use @RabbitListener with proper configuration
If you're using @RabbitListener
, configure it with shutdown handling:
@Component
public class MyMessageHandler {
@RabbitListener(queues = "my-queue",
containerFactory = "rabbitListenerContainerFactory")
public void handleMessage(Message message) {
// Your message processing logic
processMessage(message);
}
}
@Configuration
public class RabbitConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setShutdownTimeout(30000);
factory.setForceCloseChannel(false);
return factory;
}
}
The key is setting an appropriate shutdownTimeout
that gives your application enough time to finish processing messages before the container stops accepting new ones.