diff options
author | Sandy Ryza <sandy@cloudera.com> | 2015-02-06 10:53:16 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-02-06 10:53:16 -0800 |
commit | 1a88f20de798030a7d5713bd267f612ba5617fca (patch) | |
tree | 419bc288f750ee72b02597d3bee4e375c2be7a5a /yarn/src | |
parent | cc6e53119d7a51b95b19244f50b25814088b4d11 (diff) | |
download | spark-1a88f20de798030a7d5713bd267f612ba5617fca.tar.gz spark-1a88f20de798030a7d5713bd267f612ba5617fca.tar.bz2 spark-1a88f20de798030a7d5713bd267f612ba5617fca.zip |
SPARK-4337. [YARN] Add ability to cancel pending requests
Author: Sandy Ryza <sandy@cloudera.com>
Closes #4141 from sryza/sandy-spark-4337 and squashes the following commits:
a98bd20 [Sandy Ryza] Andrew's comments
cdaab7f [Sandy Ryza] SPARK-4337. Add ability to cancel pending requests to YARN
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 65 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 54 |
2 files changed, 89 insertions, 30 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0dbb6154b3..12c62a659d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -69,8 +69,7 @@ private[yarn] class YarnAllocator( } // Visible for testing. - val allocatedHostToContainersMap = - new HashMap[String, collection.mutable.Set[ContainerId]] + val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]] val allocatedContainerToHostMap = new HashMap[ContainerId, String] // Containers that we no longer care about. We've either already told the RM to release them or @@ -84,7 +83,7 @@ private[yarn] class YarnAllocator( private var executorIdCounter = 0 @volatile private var numExecutorsFailed = 0 - @volatile private var maxExecutors = args.numExecutors + @volatile private var targetNumExecutors = args.numExecutors // Keep track of which container is running which executor to remove the executors later private val executorIdToContainer = new HashMap[String, Container] @@ -133,10 +132,12 @@ private[yarn] class YarnAllocator( amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum /** - * Request as many executors from the ResourceManager as needed to reach the desired total. + * Request as many executors from the ResourceManager as needed to reach the desired total. If + * the requested total is smaller than the current number of running executors, no executors will + * be killed. */ def requestTotalExecutors(requestedTotal: Int): Unit = synchronized { - maxExecutors = requestedTotal + targetNumExecutors = requestedTotal } /** @@ -147,8 +148,8 @@ private[yarn] class YarnAllocator( val container = executorIdToContainer.remove(executorId).get internalReleaseContainer(container) numExecutorsRunning -= 1 - maxExecutors -= 1 - assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!") + targetNumExecutors -= 1 + assert(targetNumExecutors >= 0, "Allocator killed more executors than are allocated!") } else { logWarning(s"Attempted to kill unknown executor $executorId!") } @@ -163,15 +164,8 @@ private[yarn] class YarnAllocator( * This must be synchronized because variables read in this method are mutated by other methods. */ def allocateResources(): Unit = synchronized { - val numPendingAllocate = getNumPendingAllocate - val missing = maxExecutors - numPendingAllocate - numExecutorsRunning + updateResourceRequests() - if (missing > 0) { - logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " + - s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") - } - - addResourceRequests(missing) val progressIndicator = 0.1f // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container // requests. @@ -201,15 +195,36 @@ private[yarn] class YarnAllocator( } /** - * Request numExecutors additional containers from YARN. Visible for testing. + * Update the set of container requests that we will sync with the RM based on the number of + * executors we have currently running and our target number of executors. + * + * Visible for testing. */ - def addResourceRequests(numExecutors: Int): Unit = { - for (i <- 0 until numExecutors) { - val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY) - amClient.addContainerRequest(request) - val nodes = request.getNodes - val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last - logInfo("Container request (host: %s, capability: %s".format(hostStr, resource)) + def updateResourceRequests(): Unit = { + val numPendingAllocate = getNumPendingAllocate + val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning + + if (missing > 0) { + logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " + + s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") + + for (i <- 0 until missing) { + val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY) + amClient.addContainerRequest(request) + val nodes = request.getNodes + val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last + logInfo(s"Container request (host: $hostStr, capability: $resource)") + } + } else if (missing < 0) { + val numToCancel = math.min(numPendingAllocate, -missing) + logInfo(s"Canceling requests for $numToCancel executor containers") + + val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) + if (!matchingRequests.isEmpty) { + matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest) + } else { + logWarning("Expected to find pending requests, but found none.") + } } } @@ -266,7 +281,7 @@ private[yarn] class YarnAllocator( * containersToUse or remaining. * * @param allocatedContainer container that was given to us by YARN - * @location resource name, either a node, rack, or * + * @param location resource name, either a node, rack, or * * @param containersToUse list of containers that will be used * @param remaining list of containers that will not be used */ @@ -294,7 +309,7 @@ private[yarn] class YarnAllocator( private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { numExecutorsRunning += 1 - assert(numExecutorsRunning <= maxExecutors) + assert(numExecutorsRunning <= targetNumExecutors) val executorHostname = container.getNodeId.getHost val containerId = container.getId executorIdCounter += 1 diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 024b25f9d3..3c224f1488 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -107,8 +107,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach test("single container allocated") { // request a single container and receive it - val handler = createAllocator() - handler.addResourceRequests(1) + val handler = createAllocator(1) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (1) @@ -123,8 +123,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach test("some containers allocated") { // request a few containers and receive some of them - val handler = createAllocator() - handler.addResourceRequests(4) + val handler = createAllocator(4) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (4) @@ -144,7 +144,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach test("receive more containers than requested") { val handler = createAllocator(2) - handler.addResourceRequests(2) + handler.updateResourceRequests() handler.getNumExecutorsRunning should be (0) handler.getNumPendingAllocate should be (2) @@ -162,6 +162,50 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach handler.allocatedHostToContainersMap.contains("host4") should be (false) } + test("decrease total requested executors") { + val handler = createAllocator(4) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (4) + + handler.requestTotalExecutors(3) + handler.updateResourceRequests() + handler.getNumPendingAllocate should be (3) + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + + handler.getNumExecutorsRunning should be (1) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) + + handler.requestTotalExecutors(2) + handler.updateResourceRequests() + handler.getNumPendingAllocate should be (1) + } + + test("decrease total requested executors to less than currently running") { + val handler = createAllocator(4) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumPendingAllocate should be (4) + + handler.requestTotalExecutors(3) + handler.updateResourceRequests() + handler.getNumPendingAllocate should be (3) + + val container1 = createContainer("host1") + val container2 = createContainer("host2") + handler.handleAllocatedContainers(Array(container1, container2)) + + handler.getNumExecutorsRunning should be (2) + + handler.requestTotalExecutors(1) + handler.updateResourceRequests() + handler.getNumPendingAllocate should be (0) + handler.getNumExecutorsRunning should be (2) + } + test("memory exceeded diagnostic regexes") { val diagnostics = "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + |