apache spark topics How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart
spark submit kafka (4)
I am using
Kafka 0.8.2 to receive data from AdExchange then I use
Spark Streaming 1.4.1 to store data to
My problem is when I restart my
Spark Streaming Job for instance like update new version, fix bug, add new features. It will continue read the latest
kafka at the time then I will lost data AdX push to kafka during restart the job.
I try something like
auto.offset.reset -> smallest but it will receive from 0 -> last then data was huge and duplicate in db.
I also try to set specific
Spark but it the same.
How to save the latest
offset spark consumed to
kafka then can read back from that to latest
Here's some code you can use to store offsets in ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
And here's some code you can use to use the offset when you call KafkaUtils.createDirectStream: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/
I haven't figured this out 100% yet, but your best bet is probably to set up JavaStreamingContext.checkpoint().
According to some blog entries https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md there are some caveats but it almost feels like it involves certain fringe cases that are only alluded to and not actually explained.
One of the constructors of createDirectStream function can get a map that will hold the partition id as the key and the offset from which you are starting to consume as the value.
Just look at api here: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html The map that I was talking about usually called: fromOffsets
You can insert data to the map:
And use it when you create the direct stream:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
After each iteration you can get the processed offsets using:
You would be able to use this data to construct the fromOffsets map in the next iteration.
You can see the full code and usage here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html at the end of the page
To add to Michael Kopaniov's answer, if you really want to use ZK as the place you store and load your map of offsets from, you can.
However, because your results are not being output to ZK, you will not get reliable semantics unless your output operation is idempotent (which it sounds like it isn't).
If it's possible to store your results in the same document in mongo alongside the offsets in a single atomic action, that might be better for you.
For more detail, see https://www.youtube.com/watch?v=fXnNEq1v3VA