aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-03 14:59:30 -0700
committerAndrew Or <andrew@databricks.com>2015-06-03 14:59:30 -0700
commitaa40c4420717aa06a7964bd30b428fb73548beb2 (patch)
tree6754cda88318a3bd7bbb88ee4d2263589927fa93 /yarn
parentbfbf12b349e998c7e674649a07b88c4658ae0711 (diff)
downloadspark-aa40c4420717aa06a7964bd30b428fb73548beb2.tar.gz
spark-aa40c4420717aa06a7964bd30b428fb73548beb2.tar.bz2
spark-aa40c4420717aa06a7964bd30b428fb73548beb2.zip
[SPARK-8059] [YARN] Wake up allocation thread when new requests arrive.
This should help reduce latency for new executor allocations. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #6600 from vanzin/SPARK-8059 and squashes the following commits: 8387a3a [Marcelo Vanzin] [SPARK-8059] [yarn] Wake up allocation thread when new requests arrive.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala7
2 files changed, 19 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 760e458972..002d7b6eaf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -67,6 +67,7 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
+ private val allocatorLock = new Object()
// Fields used in client mode.
private var rpcEnv: RpcEnv = null
@@ -359,7 +360,9 @@ private[spark] class ApplicationMaster(
}
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Sleeping for $sleepInterval.")
- Thread.sleep(sleepInterval)
+ allocatorLock.synchronized {
+ allocatorLock.wait(sleepInterval)
+ }
} catch {
case e: InterruptedException =>
}
@@ -546,8 +549,15 @@ private[spark] class ApplicationMaster(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestExecutors(requestedTotal) =>
Option(allocator) match {
- case Some(a) => a.requestTotalExecutors(requestedTotal)
- case None => logWarning("Container allocator is not ready to request executors yet.")
+ case Some(a) =>
+ allocatorLock.synchronized {
+ if (a.requestTotalExecutors(requestedTotal)) {
+ allocatorLock.notifyAll()
+ }
+ }
+
+ case None =>
+ logWarning("Container allocator is not ready to request executors yet.")
}
context.reply(true)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 21193e7c62..940873fbd0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -146,11 +146,16 @@ private[yarn] class YarnAllocator(
* Request as many executors from the ResourceManager as needed to reach the desired total. If
* the requested total is smaller than the current number of running executors, no executors will
* be killed.
+ *
+ * @return Whether the new requested total is different than the old value.
*/
- def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+ def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal
+ true
+ } else {
+ false
}
}