You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
If an Error is thrown in the application's @KafkaListener listener code, such as an OOM, Spring Kafka catches this in MessagingMessageListenerAdapter and allows processing to continue, so the offsets are committed, and the message lost. (Unlike for an Exception where the offsets are not committed and message is retried). What is the rationale for this?
If it is an Error, the chances are that this is a serious condition that a retry might not be worth it. In your specific example of OOM, the processing stops, and the application may shut down. There is no point in letting it go through the retries. Where do you see that the offsets are committed in this case? The application will likely shut down where the Error occurred. Errors in Java represent severe, typically unrecoverable conditions. When an OOM or other JVM Error occurs, the application is unstable, and retries are pointless. Spring Kafka does a fail fast in this case. Even if it lingers and goes to the next step, things are still unstable. I'm curious to find out if you saw an offset commit take place and where you see the message failure. Thanks!
I am working with @directlx, they were kind enough to create an issue here for me when I brought the problem to them.
It is not entirely clear to me where in spring-kafka's code the offsets are getting committed. Based on what I have read elsewhere I am guessing it is happening once it gets back to the main polling loop and does the next poll but I was only tracing through spring-kafka's code to find where the error was being caught because I hit a dead end reading documentation. I am not familiar with the code.
Here is what I am observing in my testing. Take this example listener
@KafkaListener(
id = "exampleCrashingListener",
topics = "exampleCrashingTopic",
groupId = "exampleGroup",
containerFactory = "integerKafkaListenerContainerFactory"
)
public void exampleCrashingListener(int requestID)
{
logger.info("Pretending to start work for requestID={}", request.getRequestID());
try
{
Thread.sleep(30000);
}
catch (InterruptedException e)
{
logger.warn("Sleep that emulates work of exampleCrashingListener was inturrupted!");
}
if (true)
{
logger.info("Crashing (throwing OutOfMemoryError) for requestID={}", request.getRequestID());
throw new OutOfMemoryError();
}
logger.info("Work finised for requestID={}", request.getRequestID());
}
configured like this
@Bean
public ConsumerFactory<String, Integer> integerConsumerFactory()
{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Integer> integerKafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<String, Integer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(integerConsumerFactory());
return factory;
}
@Bean
public NewTopic crashingTopic()
{
return TopicBuilder.name("exampleCrashingTopic")
.partitions(1)
.replicas(1)
.build();
}
When I send a message to the topic, this is what I see in the logs
2025-04-11 09:57:21,211 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Pretending to start work for requestID=1
2025-04-11 09:57:27,216 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Crashing (throwing OutOfMemoryError) for requestID=1
If I send a second request, I get this in the logs (directly after what is above), the listener is still servicing requests despite an error being thrown and it does not reservice request 1.
2025-04-11 09:57:38,021 INFO example.kafka.Listeners [exampleCrashingListener-0-C-1] Pretending to start work for requestID=2
2025-04-11 09:57:44,034 INFO example.kafka.Listeners [exampleCrashingListener-0-C-1] Crashing (throwing OutOfMemoryError) for requestID=2
If I forcibly stop and restart the service after the crashing line, I see nothing in the service (since the offsets are commited).
I recognize that this test is relatively artificial so I also tried this by replacing the throw new OutOfMemoryError() line with this
int dummyArraySize = 15;
logger.info("Max JVM memory: " + Runtime.getRuntime().maxMemory());
long memoryConsumed = 0;
try
{
long[] memoryAllocated = null;
for (int loop = 0; loop < Integer.MAX_VALUE; loop++)
{
memoryAllocated = new long[dummyArraySize];
memoryAllocated[0] = 0;
memoryConsumed += dummyArraySize * Long.SIZE;
logger.info("Memory Consumed till now: " + memoryConsumed);
dummyArraySize *= dummyArraySize * 2;
}
}
catch (OutOfMemoryError outofMemory)
{
logger.info("Catching out of memory error");
throw outOfMemory;
}
to effectively the same result as the artificial test.
2025-04-11 10:49:05,043 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Pretending to start work for requestID=14
2025-04-11 10:49:35,055 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Crashing requestID=14
2025-04-11 10:49:35,056 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Max JVM memory: 12817793024
2025-04-11 10:49:35,056 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Memory Consumed till now: 960
2025-04-11 10:49:35,057 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Memory Consumed till now: 29760
2025-04-11 10:49:35,057 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Memory Consumed till now: 25949760
2025-04-11 10:49:35,268 INFO [exampleCrashingListener-0-C-1] example.kafka.Listeners: Catching out of memory error
and again I can send additional messages after which are processed immediately, restarting the service does not reprocess the message that crashed etc.
By talking to the Kafka broker we can see the commit happen. I am executing
Thank you for so comprehensive analysis, @ryanswech !
Looks like you have found a bug.
The KafkaMessageListenerContainer.ListenerConsumer has a logic like this in its run() method where we call the target message listener:
catch (Error e) { // NOSONAR - rethrown
this.logger.error(e, "Stopping container due to an Error");
this.fatalError = true;
wrapUp(e);
throw e;
}
However, due to the mentioned MessagingMessageListenerAdapter.invoke() logic, we do not receive that Error back:
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
If an Error is thrown in the application's @KafkaListener listener code, such as an OOM, Spring Kafka catches this in MessagingMessageListenerAdapter and allows processing to continue, so the offsets are committed, and the message lost. (Unlike for an Exception where the offsets are not committed and message is retried). What is the rationale for this?
Beta Was this translation helpful? Give feedback.
All reactions