diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-09-10 14:34:24 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-09-10 14:34:24 -0500 |
commit | 1f4a648d4e30e837d6cf3ea8de1808e2254ad70b (patch) | |
tree | 8387579c70ae84cea70e2de0df073f07dc7e5bcd /yarn/common/src/main | |
parent | 26503fdf20f4181a2b390c88b83f364e6a4ccc21 (diff) | |
download | spark-1f4a648d4e30e837d6cf3ea8de1808e2254ad70b.tar.gz spark-1f4a648d4e30e837d6cf3ea8de1808e2254ad70b.tar.bz2 spark-1f4a648d4e30e837d6cf3ea8de1808e2254ad70b.zip |
SPARK-1713. Use a thread pool for launching executors.
This patch copies the approach used in the MapReduce application master for launching containers.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #663 from sryza/sandy-spark-1713 and squashes the following commits:
036550d [Sandy Ryza] SPARK-1713. [YARN] Use a threadpool for launching executor containers
Diffstat (limited to 'yarn/common/src/main')
-rw-r--r-- | yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 02b9a81bf6..0b8744f4b8 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.util.{List => JList} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ @@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import com.google.common.util.concurrent.ThreadFactoryBuilder + object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value @@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator( protected val (preferredHostToCount, preferredRackToCount) = generateNodeToWeight(conf, preferredNodes) + private val launcherPool = new ThreadPoolExecutor( + // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue + sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, + 1, TimeUnit.MINUTES, + new LinkedBlockingQueue[Runnable](), + new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) + launcherPool.allowCoreThreadTimeOut(true) + def getNumExecutorsRunning: Int = numExecutorsRunning.intValue def getNumExecutorsFailed: Int = numExecutorsFailed.intValue @@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator( executorMemory, executorCores, securityMgr) - new Thread(executorRunnable).start() + launcherPool.execute(executorRunnable) } } logDebug(""" |