aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala27
1 files changed, 23 insertions, 4 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
index be734b8027..c4a44c1822 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -201,12 +201,31 @@ class ReliableKafkaReceiver[
topicPartitionOffsetMap.clear()
}
- /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
+ /**
+ * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
+ * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
+ */
private def storeBlockAndCommitOffset(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
- store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
- Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
- blockOffsetMap.remove(blockId)
+ var count = 0
+ var pushed = false
+ var exception: Exception = null
+ while (!pushed && count <= 3) {
+ try {
+ store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+ pushed = true
+ } catch {
+ case ex: Exception =>
+ count += 1
+ exception = ex
+ }
+ }
+ if (pushed) {
+ Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+ blockOffsetMap.remove(blockId)
+ } else {
+ stop("Error while storing block into Spark", exception)
+ }
}
/**