aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-03-18 09:18:28 -0400
committerSean Owen <sowen@cloudera.com>2015-03-18 09:18:28 -0400
commit981fbafa2a878e86abeefe1d77cca01fd848f9f6 (patch)
treedfbe80ea4bf639ee9d646d97af003469b8e8e4b7 /yarn
parent9d112a958ee2facad179344dd367a6d1ccbc9614 (diff)
downloadspark-981fbafa2a878e86abeefe1d77cca01fd848f9f6.tar.gz
spark-981fbafa2a878e86abeefe1d77cca01fd848f9f6.tar.bz2
spark-981fbafa2a878e86abeefe1d77cca01fd848f9f6.zip
[SPARK-6325] [core,yarn] Do not change target executor count when killing executors.
The dynamic execution code has two ways to reduce the number of executors: one where it reduces the total number of executors it wants, by asking for an absolute number of executors that is lower than the previous one. The second is by explicitly killing idle executors. YarnAllocator was mixing those up and lowering the target number of executors when a kill was issued. Instead, trust the frontend knows what it's doing, and kill executors without messing with other accounting. That means that if the frontend kills an executor without lowering the target, it will get a new executor shortly. The one situation where both actions (lower the target and kill executor) need to happen together is when user code explicitly calls `SparkContext.killExecutors`. In that case, issue two calls to the backend to achieve the goal. I also did some minor cleanup in related code: - avoid sending a request for executors when target is unchanged, to avoid log spam in the AM - avoid printing misleading log messages in the AM when there are no requests to cancel - fix a slow memory leak plus misleading error message on the driver caused by failing to completely unregister the executor. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5018 from vanzin/SPARK-6325 and squashes the following commits: 2e782a3 [Marcelo Vanzin] Avoid redundant logging on the AM side. a3567cd [Marcelo Vanzin] Add parentheses. a363926 [Marcelo Vanzin] Update logic. a158101 [Marcelo Vanzin] [SPARK-6325] [core,yarn] Disallow reducing executor count past running count.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala13
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala22
3 files changed, 30 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 056b8c0257..3d18690cd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -534,7 +534,6 @@ private[spark] class ApplicationMaster(
driver ! x
case RequestExecutors(requestedTotal) =>
- logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
Option(allocator) match {
case Some(a) => a.requestTotalExecutors(requestedTotal)
case None => logWarning("Container allocator is not ready to request executors yet.")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 55bfbcd9cb..c98763e15b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -86,7 +86,8 @@ private[yarn] class YarnAllocator(
@volatile private var targetNumExecutors = args.numExecutors
// Keep track of which container is running which executor to remove the executors later
- private val executorIdToContainer = new HashMap[String, Container]
+ // Visible for testing.
+ private[yarn] val executorIdToContainer = new HashMap[String, Container]
// Executor memory in MB.
protected val executorMemory = args.executorMemory
@@ -137,7 +138,10 @@ private[yarn] class YarnAllocator(
* be killed.
*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
- targetNumExecutors = requestedTotal
+ if (requestedTotal != targetNumExecutors) {
+ logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
+ targetNumExecutors = requestedTotal
+ }
}
/**
@@ -148,8 +152,6 @@ private[yarn] class YarnAllocator(
val container = executorIdToContainer.remove(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
- targetNumExecutors -= 1
- assert(targetNumExecutors >= 0, "Allocator killed more executors than are allocated!")
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
@@ -351,7 +353,8 @@ private[yarn] class YarnAllocator(
}
}
- private def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+ // Visible for testing.
+ private[yarn] def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
for (completedContainer <- completedContainers) {
val containerId = completedContainer.getContainerId
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 3c224f1488..c09b01bafc 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -206,6 +206,28 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
handler.getNumExecutorsRunning should be (2)
}
+ test("kill executors") {
+ val handler = createAllocator(4)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container1, container2))
+
+ handler.requestTotalExecutors(1)
+ handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
+
+ val statuses = Seq(container1, container2).map { c =>
+ ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0)
+ }
+ handler.updateResourceRequests()
+ handler.processCompletedContainers(statuses.toSeq)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (1)
+ }
+
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +