diff options
author | zsxwing <zsxwing@gmail.com> | 2015-08-18 20:15:54 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-18 20:16:18 -0700 |
commit | a6f8979c81c5355759f74e8b3c9eb3cafb6a9c7f (patch) | |
tree | 485aac5d6127b684489e70178d115d694b0df9a0 /streaming | |
parent | 08c5962a251555e7d34460135ab6c32cce584704 (diff) | |
download | spark-a6f8979c81c5355759f74e8b3c9eb3cafb6a9c7f.tar.gz spark-a6f8979c81c5355759f74e8b3c9eb3cafb6a9c7f.tar.bz2 spark-a6f8979c81c5355759f74e8b3c9eb3cafb6a9c7f.zip |
[SPARK-10102] [STREAMING] Fix a race condition that startReceiver may happen before setting trackerState to Started
Test failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/3305/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/
There is a race condition that setting `trackerState` to `Started` could happen after calling `startReceiver`. Then `startReceiver` won't start the receivers because it uses `! isTrackerStarted` to check if ReceiverTracker is stopping or stopped. But actually, `trackerState` is `Initialized` and will be changed to `Started` soon.
Therefore, we should use `isTrackerStopping || isTrackerStopped`.
Author: zsxwing <zsxwing@gmail.com>
Closes #8294 from zsxwing/SPARK-9504.
(cherry picked from commit 90273eff9604439a5a5853077e232d34555c67d7)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index e076fb5ea1..aae3acf7ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -468,8 +468,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false * Start a receiver along with its scheduled executors */ private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = { + def shouldStartReceiver: Boolean = { + // It's okay to start when trackerState is Initialized or Started + !(isTrackerStopping || isTrackerStopped) + } + val receiverId = receiver.streamId - if (!isTrackerStarted) { + if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) return } @@ -494,14 +499,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => - if (!isTrackerStarted) { + if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => - if (!isTrackerStarted) { + if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) |