aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala13
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala22
4 files changed, 37 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6f77fa32ce..87ebf31139 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -211,6 +211,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
+ addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingToRemove -= executorId
}
@@ -371,6 +372,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
logWarning(s"Executor to kill $id does not exist!")
}
}
+ // Killing executors means effectively that we want less executors than before, so also update
+ // the target number of executors to avoid having the backend allocate new ones.
+ val newTotal = (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+ - filteredExecutorIds.size)
+ doRequestTotalExecutors(newTotal)
+
executorsPendingToRemove ++= filteredExecutorIds
doKillExecutors(filteredExecutorIds)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 056b8c0257..3d18690cd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -534,7 +534,6 @@ private[spark] class ApplicationMaster(
driver ! x
case RequestExecutors(requestedTotal) =>
- logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
Option(allocator) match {
case Some(a) => a.requestTotalExecutors(requestedTotal)
case None => logWarning("Container allocator is not ready to request executors yet.")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 55bfbcd9cb..c98763e15b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -86,7 +86,8 @@ private[yarn] class YarnAllocator(
@volatile private var targetNumExecutors = args.numExecutors
// Keep track of which container is running which executor to remove the executors later
- private val executorIdToContainer = new HashMap[String, Container]
+ // Visible for testing.
+ private[yarn] val executorIdToContainer = new HashMap[String, Container]
// Executor memory in MB.
protected val executorMemory = args.executorMemory
@@ -137,7 +138,10 @@ private[yarn] class YarnAllocator(
* be killed.
*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
- targetNumExecutors = requestedTotal
+ if (requestedTotal != targetNumExecutors) {
+ logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
+ targetNumExecutors = requestedTotal
+ }
}
/**
@@ -148,8 +152,6 @@ private[yarn] class YarnAllocator(
val container = executorIdToContainer.remove(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
- targetNumExecutors -= 1
- assert(targetNumExecutors >= 0, "Allocator killed more executors than are allocated!")
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
@@ -351,7 +353,8 @@ private[yarn] class YarnAllocator(
}
}
- private def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+ // Visible for testing.
+ private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 3c224f1488..c09b01bafc 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -206,6 +206,28 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
handler.getNumExecutorsRunning should be (2)
}
+ test("kill executors") {
+ val handler = createAllocator(4)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container1, container2))
+
+ handler.requestTotalExecutors(1)
+ handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
+
+ val statuses = Seq(container1, container2).map { c =>
+ ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
+ }
+ handler.updateResourceRequests()
+ handler.processCompletedContainers(statuses.toSeq)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (1)
+ }
+
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +