currently I'm working with Kafka / Zookeeper and pySpark (1.6.0).
I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream()
.
There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.
Since we need the topics updated to have a monitoring here in place this is somehow weird.
In the documentation of Spark I found this comment:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream
.transform(storeOffsetRanges)
.foreachRDD(printOffsetRanges)
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
Here is the documentation: http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
I found a solution in Scala, but I can't find an equivalent for python. Here is the Scala example: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
Question
But the question is, how I'm able to update the zookeeper from that point on?
See Question&Answers more detail:os