aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-03 11:06:25 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-03 11:06:25 -0800
commit649be4fa4532dcd3001df8345f9f7e970a3fbc65 (patch)
tree83485ced8bca30e431d07164721b2fdd6688f61b /yarn/src
parent7bc9e1db2c47387ee693bcbeb4a8a2cbe11909cf (diff)
downloadspark-649be4fa4532dcd3001df8345f9f7e970a3fbc65.tar.gz
spark-649be4fa4532dcd3001df8345f9f7e970a3fbc65.tar.bz2
spark-649be4fa4532dcd3001df8345f9f7e970a3fbc65.zip
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient
`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10108 from zsxwing/fix-threadpool.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala14
1 files changed, 4 insertions, 10 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 73cd9031f0..4e044aa478 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -25,8 +25,6 @@ import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConverters._
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
@@ -40,7 +38,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ThreadUtils
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -117,13 +115,9 @@ private[yarn] class YarnAllocator(
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
- private val launcherPool = new ThreadPoolExecutor(
- // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
- sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
- 1, TimeUnit.MINUTES,
- new LinkedBlockingQueue[Runnable](),
- new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
- launcherPool.allowCoreThreadTimeOut(true)
+ private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
+ "ContainerLauncher",
+ sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)