I am trying to run simple variations of examples from official spark tutorial and a book "spark streaming in action".
The content of exceptions are strange. What is wrong with my code?
First of all I start kafka zookeeper, server, producer and 2 consumers. Then I run following code:
// read from kafka
val df = sparkService.sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topic1)
.load()
// write to kafka
import sparkService.sparkSession.implicits._
val query = df.selectExpr("CAST(key as STRING)", "CAST(value as STRING)")
.writeStream
.outputMode(OutputMode.Append())
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", topic2)
.option("checkpointLocation", "/home/pt/Dokumenty/tmp/")
.option("failOnDataLoss", "false") // only when testing
.start()
query.awaitTermination(30000)
Error occurs on writting to kafka:
question from:https://stackoverflow.com/questions/65646194/stream-from-kafka-to-kafka-with-spark-structured-streaming-in-scalaException in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1 1609627750463