aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-02-04 14:20:44 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-04 14:20:44 -0800
commitf0500f9fa378d81e4b4038a66a40eee15806b677 (patch)
treed21e8c989705e05acf4c59c6c920af18c6e38010 /external
parentb0c0021953826bccaee818a54afc44e8bdfa8572 (diff)
downloadspark-f0500f9fa378d81e4b4038a66a40eee15806b677.tar.gz
spark-f0500f9fa378d81e4b4038a66a40eee15806b677.tar.bz2
spark-f0500f9fa378d81e4b4038a66a40eee15806b677.zip
[SPARK-4707][STREAMING] Reliable Kafka Receiver can lose data if the blo...
...ck generator fails to store data. The Reliable Kafka Receiver commits offsets only when events are actually stored, which ensures that on restart we will actually start where we left off. But if the failure happens in the store() call, and the block generator reports an error the receiver does not do anything and will continue reading from the current offset and not the last commit. This means that messages between the last commit and the current offset will be lost. This PR retries the store call four times and then stops the receiver with an error message and the last exception that was received from the store. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #3655 from harishreedharan/kafka-failure-fix and squashes the following commits: 5e2e7ad [Hari Shreedharan] [SPARK-4704][STREAMING] Reliable Kafka Receiver can lose data if the block generator fails to store data.
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala27
1 files changed, 23 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index be734b8027..c4a44c1822 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -201,12 +201,31 @@ class ReliableKafkaReceiver[
topicPartitionOffsetMap.clear()
}
- /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
+ /**
+ * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
+ * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
+ */
private def storeBlockAndCommitOffset(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
- store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
- Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
- blockOffsetMap.remove(blockId)
+ var count = 0
+ var pushed = false
+ var exception: Exception = null
+ while (!pushed && count <= 3) {
+ try {
+ store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+ pushed = true
+ } catch {
+ case ex: Exception =>
+ count += 1
+ exception = ex
+ }
+ }
+ if (pushed) {
+ Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+ blockOffsetMap.remove(blockId)
+ } else {
+ stop("Error while storing block into Spark", exception)
+ }
}
/**