Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I am facing some errors during serialization of KStream to KTable when I introduced the groupBy to my KStream. As I understood, once you have an aggregate or reduce on a KStream, Kafka tries to transform it to a KTable due to necessary shuffle and because of this Kafka has to serialize the records again. So, my original KStream was just mapping the records from JSON to AVRO like this, and it is working fine.

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .mapValues(v -> recordBuilder.getNotificationAvro(v));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
        return notificationAvroKStream;
    }

then I introduced the groupByKey and reduce and I realized that it transforms to a KTable and hence it needed Serdes on the application.yaml file. But unfortunately I cannot configure the default Serdes because I have other types of serialization. Hence I decided to serialize on the KTable topology. I am trying to implement this solution based on this answer.

The part of the code that I try to materialize with my custom serdes is not working properly (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())). First, I don't think I need KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");, but without this it also does not work and I cannot find a materialized that is not KeyValueBytes... where I can define my serdes CustomSerdes.String(), CustomSerdes.NotificationAvro().

According to the answer that I mentioned on the link, they also use a final StreamsBuilder builder = new StreamsBuilder();. But since I am computing it using spring-kafka I don't have this option, or if I have I don't know how to use.

@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
    @Autowired
    RecordBuilder recordBuilder;

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));

        // *********************************************
        // IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
        // KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
        KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
                .toTable(
                        // *********************************************
                        // HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
                        Materialized
                                // .as(storeSupplier) // this is not necessary
                                .with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
                        // *********************************************
                )
                .groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
                .reduce(
                        (aggValue, newValue) -> {
                            newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                            return newValue;
                        },
                        (aggValue, oldValue) -> oldValue
                );
        KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
        notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));

        return notificationAggAvroKStream;
    }
}

the custom serdes:

@Service
public class CustomSerdes extends Serdes {
    private static final String schema_registry_url = "http://localhost:8081";
    private final static Map<String, String> serdeConfig = Collections
            .singletonMap("schema.registry.url", schema_registry_url);
    public static Serde<NotificationAvro> NotificationAvro() {
        final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
        notificationAvroSerde.configure(serdeConfig, false);
        return notificationAvroSerde;
    }
}

and the error:

Exception in thread "NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))). ... ... Caused by: java.lang.ClassCastException: class com.github.felipegutierrez.explore.spring.model.NotificationAvro cannot be cast to class java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
243 views
Welcome To Ask or Share your Answers For Others

1 Answer

So, I solved through reading this answer, which is using the .groupByKey(...) and Serialized.with(...) that are deprecated. I am using the Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()).

KStream<String, NotificationAvro> notificationAvroKStream = input
     .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
     .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
     .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
     .reduce((aggValue, newValue) -> {
          newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
          return newValue;
     })
     .toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...