I'm currently struggling handling serialization exceptions properly in a kafka stream application. Using the latest version. Exception handling for deserialization and production exceptions are fine. There I'm using
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyCustomDeserializationExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, MyCustomProductionExceptionHandler.class);
I'm using AVRO as output format and having define such a serde:
private SpecificAvroSerde<MyClass> avroSerde(Properties envProps) {
SpecificAvroSerde<MyClass> avroSerde = new SpecificAvroSerde<>();
final HashMap<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
avroSerde.configure(serdeConfig, false);
return avroSerde;
}
If for example a required value is not set we obviously get an exception. But as we are dynamically mapping input values it's not really in our hand if we get correct values. An exception looks like
Error encountered sending record to topic NAME for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic NAME for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:129)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
Finally the question/problem. How can I handle such serialization exceptions properly? I would like to log the error and not having the stream failing.
Thanks in advance!
question from:https://stackoverflow.com/questions/66059296/how-to-handle-serialization-exception-with-kafka-stream