Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I am trying to run simple variations of examples from official spark tutorial and a book "spark streaming in action".
The content of exceptions are strange. What is wrong with my code?

First of all I start kafka zookeeper, server, producer and 2 consumers. Then I run following code:

// read from kafka
val df = sparkService.sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", topic1)
    .load()

// write to kafka
import sparkService.sparkSession.implicits._

val query = df.selectExpr("CAST(key as STRING)", "CAST(value as STRING)")
    .writeStream
    .outputMode(OutputMode.Append())
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", topic2)
    .option("checkpointLocation", "/home/pt/Dokumenty/tmp/")
    .option("failOnDataLoss", "false") // only when testing
    .start()

query.awaitTermination(30000)

Error occurs on writting to kafka:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1 1609627750463

question from:https://stackoverflow.com/questions/65646194/stream-from-kafka-to-kafka-with-spark-structured-streaming-in-scala

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
458 views
Welcome To Ask or Share your Answers For Others

1 Answer

Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...