aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-01 07:09:31 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-10-01 07:09:31 -0700
commit9b3e7768a27d51ddd4711c4a68a428a6875bd6d7 (patch)
treed7bd49b1339141c6781e0e1b2735f93dd14dc84b /core/src/main/scala/org/apache
parentf21e2da03fbf8041fece476e3d5c699aef819451 (diff)
downloadspark-9b3e7768a27d51ddd4711c4a68a428a6875bd6d7.tar.gz
spark-9b3e7768a27d51ddd4711c4a68a428a6875bd6d7.tar.bz2
spark-9b3e7768a27d51ddd4711c4a68a428a6875bd6d7.zip
[SPARK-10058] [CORE] [TESTS] Fix the flaky tests in HeartbeatReceiverSuite
Fixed the test failure here: https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-1.5-SBT/116/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/testReport/junit/org.apache.spark/HeartbeatReceiverSuite/normal_heartbeat/ This failure is because `HeartbeatReceiverSuite. heartbeatReceiver` may receive `SparkListenerExecutorAdded("driver")` sent from [LocalBackend](https://github.com/apache/spark/blob/8fb3a65cbb714120d612e58ef9d12b0521a83260/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala#L121). There are other race conditions in `HeartbeatReceiverSuite` because `HeartbeatReceiver.onExecutorAdded` and `HeartbeatReceiver.onExecutorRemoved` are asynchronous. This PR also fixed them. Author: zsxwing <zsxwing@gmail.com> Closes #8946 from zsxwing/SPARK-10058.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala25
1 files changed, 23 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index ee60d697d8..1f1f0b75de 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
+import scala.concurrent.Future
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
@@ -148,10 +149,30 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
}
/**
+ * Send ExecutorRegistered to the event loop to add a new executor. Only for test.
+ *
+ * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
+ * indicate if this operation is successful.
+ */
+ def addExecutor(executorId: String): Option[Future[Boolean]] = {
+ Option(self).map(_.ask[Boolean](ExecutorRegistered(executorId)))
+ }
+
+ /**
* If the heartbeat receiver is not stopped, notify it of executor registrations.
*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
- Option(self).foreach(_.ask[Boolean](ExecutorRegistered(executorAdded.executorId)))
+ addExecutor(executorAdded.executorId)
+ }
+
+ /**
+ * Send ExecutorRemoved to the event loop to remove a executor. Only for test.
+ *
+ * @return if HeartbeatReceiver is stopped, return None. Otherwise, return a Some(Future) that
+ * indicate if this operation is successful.
+ */
+ def removeExecutor(executorId: String): Option[Future[Boolean]] = {
+ Option(self).map(_.ask[Boolean](ExecutorRemoved(executorId)))
}
/**
@@ -165,7 +186,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
* and expire it with loud error messages.
*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
- Option(self).foreach(_.ask[Boolean](ExecutorRemoved(executorRemoved.executorId)))
+ removeExecutor(executorRemoved.executorId)
}
private def expireDeadHosts(): Unit = {