aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-07-16 19:39:54 -0700
committerAndrew Or <andrew@databricks.com>2015-07-16 19:39:54 -0700
commit96aa3340f41d8de4560caec97e8f3de23252c792 (patch)
treefc2bbeb935e881bbd44f64fd43e11ad7c991b470 /core
parentd86bbb4e286f16f77ba125452b07827684eafeed (diff)
downloadspark-96aa3340f41d8de4560caec97e8f3de23252c792.tar.gz
spark-96aa3340f41d8de4560caec97e8f3de23252c792.tar.bz2
spark-96aa3340f41d8de4560caec97e8f3de23252c792.zip
[SPARK-8119] HeartbeatReceiver should replace executors, not kill
**Symptom.** If an executor in an application times out, `HeartbeatReceiver` attempts to kill it. After this happens, however, the application never gets an executor back even when there are cluster resources available. **Cause.** The issue is that `sc.killExecutor` automatically assumes that the application wishes to adjust its resource requirements permanently downwards. This is not the intention in `HeartbeatReceiver`, however, which simply wants a replacement for the expired executor. **Fix.** Differentiate between the intention to kill and the intention to replace an executor with a fresh one. More details can be found in the commit message. Author: Andrew Or <andrew@databricks.com> Closes #7107 from andrewor14/heartbeat-no-kill and squashes the following commits: 1cd2cd7 [Andrew Or] Add regression test for SPARK-8119 25a347d [Andrew Or] Reuse more code in scheduler backend 31ebd40 [Andrew Or] Differentiate between kill and replace
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala40
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala147
4 files changed, 194 insertions, 37 deletions
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 221b1dab43..43dd4a1707 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -181,7 +181,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
- sc.killExecutor(executorId)
+ // Note: we want to get an executor back after expiring this one,
+ // so do not simply call `sc.killExecutor` here (SPARK-8119)
+ sc.killAndReplaceExecutor(executorId)
}
})
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bd1cc332a6..d00c012d80 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1419,6 +1419,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
+ *
+ * Note: This is an indication to the cluster manager that the application wishes to adjust
+ * its resource usage downwards. If the application wishes to replace the executors it kills
+ * through this method with new ones, it should follow up explicitly with a call to
+ * {{SparkContext#requestExecutors}}.
+ *
* This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
@@ -1436,12 +1442,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
- * Request that cluster manager the kill the specified executor.
- * This is currently only supported in Yarn mode. Return whether the request is received.
+ * Request that the cluster manager kill the specified executor.
+ *
+ * Note: This is an indication to the cluster manager that the application wishes to adjust
+ * its resource usage downwards. If the application wishes to replace the executor it kills
+ * through this method with a new one, it should follow up explicitly with a call to
+ * {{SparkContext#requestExecutors}}.
+ *
+ * This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
+ /**
+ * Request that the cluster manager kill the specified executor without adjusting the
+ * application resource requirements.
+ *
+ * The effect is that a new executor will be launched in place of the one killed by
+ * this request. This assumes the cluster manager will automatically and eventually
+ * fulfill all missing application resource requests.
+ *
+ * Note: The replace is by no means guaranteed; another application on the same cluster
+ * can steal the window of opportunity and acquire this application's resources in the
+ * mean time.
+ *
+ * This is currently only supported in YARN mode. Return whether the request is received.
+ */
+ private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
+ schedulerBackend match {
+ case b: CoarseGrainedSchedulerBackend =>
+ b.killExecutors(Seq(executorId), replace = true)
+ case _ =>
+ logWarning("Killing executors is only supported in coarse-grained mode")
+ false
+ }
+ }
+
/** The version of Spark on which this application is running. */
def version: String = SPARK_VERSION
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7c7f70d8a1..0e3215d6e9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -371,26 +371,36 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
- * Return whether the kill request is acknowledged.
+ * @return whether the kill request is acknowledged.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
+ killExecutors(executorIds, replace = false)
+ }
+
+ /**
+ * Request that the cluster manager kill the specified executors.
+ *
+ * @param executorIds identifiers of executors to kill
+ * @param replace whether to replace the killed executors with new ones
+ * @return whether the kill request is acknowledged.
+ */
+ final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
- val filteredExecutorIds = new ArrayBuffer[String]
- executorIds.foreach { id =>
- if (executorDataMap.contains(id)) {
- filteredExecutorIds += id
- } else {
- logWarning(s"Executor to kill $id does not exist!")
- }
+ val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
+ unknownExecutors.foreach { id =>
+ logWarning(s"Executor to kill $id does not exist!")
+ }
+
+ // If we do not wish to replace the executors we kill, sync the target number of executors
+ // with the cluster manager to avoid allocating new ones. When computing the new target,
+ // take into account executors that are pending to be added or removed.
+ if (!replace) {
+ doRequestTotalExecutors(numExistingExecutors + numPendingExecutors
+ - executorsPendingToRemove.size - knownExecutors.size)
}
- // Killing executors means effectively that we want less executors than before, so also update
- // the target number of executors to avoid having the backend allocate new ones.
- val newTotal = (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
- - filteredExecutorIds.size)
- doRequestTotalExecutors(newTotal)
- executorsPendingToRemove ++= filteredExecutorIds
- doKillExecutors(filteredExecutorIds)
+ executorsPendingToRemove ++= knownExecutors
+ doKillExecutors(knownExecutors)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index b31b091966..5a2670e4d1 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark
+import java.util.concurrent.{ExecutorService, TimeUnit}
+
+import scala.collection.mutable
import scala.language.postfixOps
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
@@ -25,11 +28,16 @@ import org.mockito.Matchers
import org.mockito.Matchers._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv, RpcEndpointRef}
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ManualClock
+/**
+ * A test suite for the heartbeating behavior between the driver and the executors.
+ */
class HeartbeatReceiverSuite
extends SparkFunSuite
with BeforeAndAfterEach
@@ -40,23 +48,40 @@ class HeartbeatReceiverSuite
private val executorId2 = "executor-2"
// Shared state that must be reset before and after each test
- private var scheduler: TaskScheduler = null
+ private var scheduler: TaskSchedulerImpl = null
private var heartbeatReceiver: HeartbeatReceiver = null
private var heartbeatReceiverRef: RpcEndpointRef = null
private var heartbeatReceiverClock: ManualClock = null
+ // Helper private method accessors for HeartbeatReceiver
+ private val _executorLastSeen = PrivateMethod[collection.Map[String, Long]]('executorLastSeen)
+ private val _executorTimeoutMs = PrivateMethod[Long]('executorTimeoutMs)
+ private val _killExecutorThread = PrivateMethod[ExecutorService]('killExecutorThread)
+
+ /**
+ * Before each test, set up the SparkContext and a custom [[HeartbeatReceiver]]
+ * that uses a manual clock.
+ */
override def beforeEach(): Unit = {
- sc = spy(new SparkContext("local[2]", "test"))
- scheduler = mock(classOf[TaskScheduler])
+ val conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.dynamicAllocation.testing", "true")
+ sc = spy(new SparkContext(conf))
+ scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
+ when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
heartbeatReceiverRef = sc.env.rpcEnv.setupEndpoint("heartbeat", heartbeatReceiver)
when(scheduler.executorHeartbeatReceived(any(), any(), any())).thenReturn(true)
}
+ /**
+ * After each test, clean up all state and stop the [[SparkContext]].
+ */
override def afterEach(): Unit = {
- resetSparkContext()
+ super.afterEach()
scheduler = null
heartbeatReceiver = null
heartbeatReceiverRef = null
@@ -75,7 +100,7 @@ class HeartbeatReceiverSuite
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = false)
- val trackedExecutors = executorLastSeen(heartbeatReceiver)
+ val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
assert(trackedExecutors.size === 2)
assert(trackedExecutors.contains(executorId1))
assert(trackedExecutors.contains(executorId2))
@@ -83,15 +108,15 @@ class HeartbeatReceiverSuite
test("reregister if scheduler is not ready yet") {
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
- // Task scheduler not set in HeartbeatReceiver
+ // Task scheduler is not set yet in HeartbeatReceiver, so executors should reregister
triggerHeartbeat(executorId1, executorShouldReregister = true)
}
test("reregister if heartbeat from unregistered executor") {
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
- // Received heartbeat from unknown receiver, so we ask it to re-register
+ // Received heartbeat from unknown executor, so we ask it to re-register
triggerHeartbeat(executorId1, executorShouldReregister = true)
- assert(executorLastSeen(heartbeatReceiver).isEmpty)
+ assert(heartbeatReceiver.invokePrivate(_executorLastSeen()).isEmpty)
}
test("reregister if heartbeat from removed executor") {
@@ -104,14 +129,14 @@ class HeartbeatReceiverSuite
// A heartbeat from the second executor should require reregistering
triggerHeartbeat(executorId1, executorShouldReregister = false)
triggerHeartbeat(executorId2, executorShouldReregister = true)
- val trackedExecutors = executorLastSeen(heartbeatReceiver)
+ val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
assert(trackedExecutors.size === 1)
assert(trackedExecutors.contains(executorId1))
assert(!trackedExecutors.contains(executorId2))
}
test("expire dead hosts") {
- val executorTimeout = executorTimeoutMs(heartbeatReceiver)
+ val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
@@ -124,12 +149,61 @@ class HeartbeatReceiverSuite
heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
// Only the second executor should be expired as a dead host
verify(scheduler).executorLost(Matchers.eq(executorId2), any())
- val trackedExecutors = executorLastSeen(heartbeatReceiver)
+ val trackedExecutors = heartbeatReceiver.invokePrivate(_executorLastSeen())
assert(trackedExecutors.size === 1)
assert(trackedExecutors.contains(executorId1))
assert(!trackedExecutors.contains(executorId2))
}
+ test("expire dead hosts should kill executors with replacement (SPARK-8119)") {
+ // Set up a fake backend and cluster manager to simulate killing executors
+ val rpcEnv = sc.env.rpcEnv
+ val fakeClusterManager = new FakeClusterManager(rpcEnv)
+ val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
+ val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef)
+ when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)
+
+ // Register fake executors with our fake scheduler backend
+ // This is necessary because the backend refuses to kill executors it does not know about
+ fakeSchedulerBackend.start()
+ val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv)
+ val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
+ val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
+ val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
+ fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+ RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+ fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+ RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+ heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
+ heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId1, null))
+ heartbeatReceiver.onExecutorAdded(SparkListenerExecutorAdded(0, executorId2, null))
+ triggerHeartbeat(executorId1, executorShouldReregister = false)
+ triggerHeartbeat(executorId2, executorShouldReregister = false)
+
+ // Adjust the target number of executors on the cluster manager side
+ assert(fakeClusterManager.getTargetNumExecutors === 0)
+ sc.requestTotalExecutors(2)
+ assert(fakeClusterManager.getTargetNumExecutors === 2)
+ assert(fakeClusterManager.getExecutorIdsToKill.isEmpty)
+
+ // Expire the executors. This should trigger our fake backend to kill the executors.
+ // Since the kill request is sent to the cluster manager asynchronously, we need to block
+ // on the kill thread to ensure that the cluster manager actually received our requests.
+ // Here we use a timeout of O(seconds), but in practice this whole test takes O(10ms).
+ val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
+ heartbeatReceiverClock.advance(executorTimeout * 2)
+ heartbeatReceiverRef.askWithRetry[Boolean](ExpireDeadHosts)
+ val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
+ killThread.shutdown() // needed for awaitTermination
+ killThread.awaitTermination(10L, TimeUnit.SECONDS)
+
+ // The target number of executors should not change! Otherwise, having an expired
+ // executor means we permanently adjust the target number downwards until we
+ // explicitly request new executors. For more detail, see SPARK-8119.
+ assert(fakeClusterManager.getTargetNumExecutors === 2)
+ assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2))
+ }
+
/** Manually send a heartbeat and return the response. */
private def triggerHeartbeat(
executorId: String,
@@ -148,14 +222,49 @@ class HeartbeatReceiverSuite
}
}
- // Helper methods to access private fields in HeartbeatReceiver
- private val _executorLastSeen = PrivateMethod[collection.Map[String, Long]]('executorLastSeen)
- private val _executorTimeoutMs = PrivateMethod[Long]('executorTimeoutMs)
- private def executorLastSeen(receiver: HeartbeatReceiver): collection.Map[String, Long] = {
- receiver invokePrivate _executorLastSeen()
+}
+
+// TODO: use these classes to add end-to-end tests for dynamic allocation!
+
+/**
+ * Dummy RPC endpoint to simulate executors.
+ */
+private class FakeExecutorEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint
+
+/**
+ * Dummy scheduler backend to simulate executor allocation requests to the cluster manager.
+ */
+private class FakeSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ rpcEnv: RpcEnv,
+ clusterManagerEndpoint: RpcEndpointRef)
+ extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+ protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ clusterManagerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
}
- private def executorTimeoutMs(receiver: HeartbeatReceiver): Long = {
- receiver invokePrivate _executorTimeoutMs()
+
+ protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ clusterManagerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
}
+}
+/**
+ * Dummy cluster manager to simulate responses to executor allocation requests.
+ */
+private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoint {
+ private var targetNumExecutors = 0
+ private val executorIdsToKill = new mutable.HashSet[String]
+
+ def getTargetNumExecutors: Int = targetNumExecutors
+ def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case RequestExecutors(requestedTotal) =>
+ targetNumExecutors = requestedTotal
+ context.reply(true)
+ case KillExecutors(executorIds) =>
+ executorIdsToKill ++= executorIds
+ context.reply(true)
+ }
}