aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-18 20:15:54 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-18 20:15:54 -0700
commit90273eff9604439a5a5853077e232d34555c67d7 (patch)
treee6171b6c9a7c7683ae3f155b145043760dac9777 /streaming
parent1aeae05bb20f01ab7ccaa62fe905a63e020074b5 (diff)
downloadspark-90273eff9604439a5a5853077e232d34555c67d7.tar.gz
spark-90273eff9604439a5a5853077e232d34555c67d7.tar.bz2
spark-90273eff9604439a5a5853077e232d34555c67d7.zip
[SPARK-10102] [STREAMING] Fix a race condition that startReceiver may happen before setting trackerState to Started
Test failure: https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/3305/testReport/junit/org.apache.spark.streaming/StreamingContextSuite/stop_gracefully/ There is a race condition that setting `trackerState` to `Started` could happen after calling `startReceiver`. Then `startReceiver` won't start the receivers because it uses `! isTrackerStarted` to check if ReceiverTracker is stopping or stopped. But actually, `trackerState` is `Initialized` and will be changed to `Started` soon. Therefore, we should use `isTrackerStopping || isTrackerStopped`. Author: zsxwing <zsxwing@gmail.com> Closes #8294 from zsxwing/SPARK-9504.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala11
1 files changed, 8 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index e076fb5ea1..aae3acf7ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -468,8 +468,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* Start a receiver along with its scheduled executors
*/
private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
+ def shouldStartReceiver: Boolean = {
+ // It's okay to start when trackerState is Initialized or Started
+ !(isTrackerStopping || isTrackerStopped)
+ }
+
val receiverId = receiver.streamId
- if (!isTrackerStarted) {
+ if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
@@ -494,14 +499,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
- if (!isTrackerStarted) {
+ if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
- if (!isTrackerStarted) {
+ if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)