Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm currently struggling handling serialization exceptions properly in a kafka stream application. Using the latest version. Exception handling for deserialization and production exceptions are fine. There I'm using

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyCustomDeserializationExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, MyCustomProductionExceptionHandler.class);

I'm using AVRO as output format and having define such a serde:

private SpecificAvroSerde<MyClass> avroSerde(Properties envProps) {
  SpecificAvroSerde<MyClass> avroSerde = new SpecificAvroSerde<>();
  final HashMap<String, String> serdeConfig = new HashMap<>();
  serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
  avroSerde.configure(serdeConfig, false);
  return avroSerde;
}

If for example a required value is not set we obviously get an exception. But as we are dynamically mapping input values it's not really in our hand if we get correct values. An exception looks like

Error encountered sending record to topic NAME for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic NAME for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:167)
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:129)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)

Finally the question/problem. How can I handle such serialization exceptions properly? I would like to log the error and not having the stream failing.

Thanks in advance!

question from:https://stackoverflow.com/questions/66059296/how-to-handle-serialization-exception-with-kafka-stream

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
133 views
Welcome To Ask or Share your Answers For Others

1 Answer

Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...