aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-02-06 10:53:16 -0800
committerAndrew Or <andrew@databricks.com>2015-02-06 10:53:16 -0800
commit1a88f20de798030a7d5713bd267f612ba5617fca (patch)
tree419bc288f750ee72b02597d3bee4e375c2be7a5a /yarn
parentcc6e53119d7a51b95b19244f50b25814088b4d11 (diff)
downloadspark-1a88f20de798030a7d5713bd267f612ba5617fca.tar.gz
spark-1a88f20de798030a7d5713bd267f612ba5617fca.tar.bz2
spark-1a88f20de798030a7d5713bd267f612ba5617fca.zip
SPARK-4337. [YARN] Add ability to cancel pending requests
Author: Sandy Ryza <sandy@cloudera.com> Closes #4141 from sryza/sandy-spark-4337 and squashes the following commits: a98bd20 [Sandy Ryza] Andrew's comments cdaab7f [Sandy Ryza] SPARK-4337. Add ability to cancel pending requests to YARN
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala65
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala54
2 files changed, 89 insertions, 30 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 0dbb6154b3..12c62a659d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -69,8 +69,7 @@ private[yarn] class YarnAllocator(
}
// Visible for testing.
- val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]
+ val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]
val allocatedContainerToHostMap = new HashMap[ContainerId, String]
// Containers that we no longer care about. We've either already told the RM to release them or
@@ -84,7 +83,7 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0
- @volatile private var maxExecutors = args.numExecutors
+ @volatile private var targetNumExecutors = args.numExecutors
// Keep track of which container is running which executor to remove the executors later
private val executorIdToContainer = new HashMap[String, Container]
@@ -133,10 +132,12 @@ private[yarn] class YarnAllocator(
amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum
/**
- * Request as many executors from the ResourceManager as needed to reach the desired total.
+ * Request as many executors from the ResourceManager as needed to reach the desired total. If
+ * the requested total is smaller than the current number of running executors, no executors will
+ * be killed.
*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
- maxExecutors = requestedTotal
+ targetNumExecutors = requestedTotal
}
/**
@@ -147,8 +148,8 @@ private[yarn] class YarnAllocator(
val container = executorIdToContainer.remove(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
- maxExecutors -= 1
- assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+ targetNumExecutors -= 1
+ assert(targetNumExecutors >= 0, "Allocator killed more executors than are allocated!")
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
@@ -163,15 +164,8 @@ private[yarn] class YarnAllocator(
* This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
- val numPendingAllocate = getNumPendingAllocate
- val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
+ updateResourceRequests()
- if (missing > 0) {
- logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
- s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
- }
-
- addResourceRequests(missing)
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
@@ -201,15 +195,36 @@ private[yarn] class YarnAllocator(
}
/**
- * Request numExecutors additional containers from YARN. Visible for testing.
+ * Update the set of container requests that we will sync with the RM based on the number of
+ * executors we have currently running and our target number of executors.
+ *
+ * Visible for testing.
*/
- def addResourceRequests(numExecutors: Int): Unit = {
- for (i <- 0 until numExecutors) {
- val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
- amClient.addContainerRequest(request)
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
- logInfo("Container request (host: %s, capability: %s".format(hostStr, resource))
+ def updateResourceRequests(): Unit = {
+ val numPendingAllocate = getNumPendingAllocate
+ val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
+
+ if (missing > 0) {
+ logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
+ s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
+
+ for (i <- 0 until missing) {
+ val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+ logInfo(s"Container request (host: $hostStr, capability: $resource)")
+ }
+ } else if (missing < 0) {
+ val numToCancel = math.min(numPendingAllocate, -missing)
+ logInfo(s"Canceling requests for $numToCancel executor containers")
+
+ val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
+ if (!matchingRequests.isEmpty) {
+ matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest)
+ } else {
+ logWarning("Expected to find pending requests, but found none.")
+ }
}
}
@@ -266,7 +281,7 @@ private[yarn] class YarnAllocator(
* containersToUse or remaining.
*
* @param allocatedContainer container that was given to us by YARN
- * @location resource name, either a node, rack, or *
+ * @param location resource name, either a node, rack, or *
* @param containersToUse list of containers that will be used
* @param remaining list of containers that will not be used
*/
@@ -294,7 +309,7 @@ private[yarn] class YarnAllocator(
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
numExecutorsRunning += 1
- assert(numExecutorsRunning <= maxExecutors)
+ assert(numExecutorsRunning <= targetNumExecutors)
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
executorIdCounter += 1
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 024b25f9d3..3c224f1488 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -107,8 +107,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
test("single container allocated") {
// request a single container and receive it
- val handler = createAllocator()
- handler.addResourceRequests(1)
+ val handler = createAllocator(1)
+ handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (1)
@@ -123,8 +123,8 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
test("some containers allocated") {
// request a few containers and receive some of them
- val handler = createAllocator()
- handler.addResourceRequests(4)
+ val handler = createAllocator(4)
+ handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
@@ -144,7 +144,7 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
test("receive more containers than requested") {
val handler = createAllocator(2)
- handler.addResourceRequests(2)
+ handler.updateResourceRequests()
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)
@@ -162,6 +162,50 @@ class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach
handler.allocatedHostToContainersMap.contains("host4") should be (false)
}
+ test("decrease total requested executors") {
+ val handler = createAllocator(4)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ handler.requestTotalExecutors(3)
+ handler.updateResourceRequests()
+ handler.getNumPendingAllocate should be (3)
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container))
+
+ handler.getNumExecutorsRunning should be (1)
+ handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+
+ handler.requestTotalExecutors(2)
+ handler.updateResourceRequests()
+ handler.getNumPendingAllocate should be (1)
+ }
+
+ test("decrease total requested executors to less than currently running") {
+ val handler = createAllocator(4)
+ handler.updateResourceRequests()
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ handler.requestTotalExecutors(3)
+ handler.updateResourceRequests()
+ handler.getNumPendingAllocate should be (3)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container1, container2))
+
+ handler.getNumExecutorsRunning should be (2)
+
+ handler.requestTotalExecutors(1)
+ handler.updateResourceRequests()
+ handler.getNumPendingAllocate should be (0)
+ handler.getNumExecutorsRunning should be (2)
+ }
+
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +