aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md7
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala14
2 files changed, 19 insertions, 2 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 943f06b114..d8b22f3663 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
the environment of the executor launcher.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.containerLauncherMaxThreads</code></td>
+ <td>25</td>
+ <td>
+ The maximum number of threads to use in the application master for launching executor containers.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 02b9a81bf6..0b8744f4b8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn
import java.util.{List => JList}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
@@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
@@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)
+ private val launcherPool = new ThreadPoolExecutor(
+ // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
+ sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
+ 1, TimeUnit.MINUTES,
+ new LinkedBlockingQueue[Runnable](),
+ new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
+ launcherPool.allowCoreThreadTimeOut(true)
+
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
executorMemory,
executorCores,
securityMgr)
- new Thread(executorRunnable).start()
+ launcherPool.execute(executorRunnable)
}
}
logDebug("""