aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-09-10 14:34:24 -0500
committerThomas Graves <tgraves@apache.org>2014-09-10 14:34:24 -0500
commit1f4a648d4e30e837d6cf3ea8de1808e2254ad70b (patch)
tree8387579c70ae84cea70e2de0df073f07dc7e5bcd
parent26503fdf20f4181a2b390c88b83f364e6a4ccc21 (diff)
downloadspark-1f4a648d4e30e837d6cf3ea8de1808e2254ad70b.tar.gz
spark-1f4a648d4e30e837d6cf3ea8de1808e2254ad70b.tar.bz2
spark-1f4a648d4e30e837d6cf3ea8de1808e2254ad70b.zip
SPARK-1713. Use a thread pool for launching executors.
This patch copies the approach used in the MapReduce application master for launching containers. Author: Sandy Ryza <sandy@cloudera.com> Closes #663 from sryza/sandy-spark-1713 and squashes the following commits: 036550d [Sandy Ryza] SPARK-1713. [YARN] Use a threadpool for launching executor containers
-rw-r--r--docs/running-on-yarn.md7
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala14
2 files changed, 19 insertions, 2 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 943f06b114..d8b22f3663 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -125,6 +125,13 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
the environment of the executor launcher.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.containerLauncherMaxThreads</code></td>
+ <td>25</td>
+ <td>
+ The maximum number of threads to use in the application master for launching executor containers.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 02b9a81bf6..0b8744f4b8 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.yarn
import java.util.{List => JList}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
@@ -32,6 +32,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
@@ -95,6 +97,14 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)
+ private val launcherPool = new ThreadPoolExecutor(
+ // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
+ sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
+ 1, TimeUnit.MINUTES,
+ new LinkedBlockingQueue[Runnable](),
+ new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
+ launcherPool.allowCoreThreadTimeOut(true)
+
def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
@@ -283,7 +293,7 @@ private[yarn] abstract class YarnAllocator(
executorMemory,
executorCores,
securityMgr)
- new Thread(executorRunnable).start()
+ launcherPool.execute(executorRunnable)
}
}
logDebug("""