aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala8
4 files changed, 31 insertions, 21 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 26dbd6237c..a12f82d2fb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.{Utils, IntParam, MemoryParam}
-
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -39,15 +39,17 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var appName: String = "Spark"
var priority = 0
+ parseArgs(args.toList)
+ loadEnvironmentArgs()
+
// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
- val amMemoryOverhead = sparkConf.getInt(
- "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
- val executorMemoryOverhead = sparkConf.getInt(
- "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+ val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+ val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
- parseArgs(args.toList)
- loadEnvironmentArgs()
validateArgs()
/** Load any default arguments provided through environment variables and Spark properties. */
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 1cf19c1985..6ecac6eae6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -64,14 +64,18 @@ private[spark] trait ClientBase extends Logging {
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
- s"is above the max threshold ($maxMem MB) of this cluster!")
+ throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+ s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {
- throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
- s"is above the max threshold ($maxMem MB) of this cluster!")
+ throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+ s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
+ logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
+ amMem,
+ amMemoryOverhead))
+
// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 299e38a5eb..4f4f1d2aaa 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
object AllocationType extends Enumeration {
type AllocationType = Value
@@ -78,10 +79,6 @@ private[yarn] abstract class YarnAllocator(
// Containers to be released in next request to RM
private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
- // Additional memory overhead - in mb.
- protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
@@ -97,6 +94,10 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)
+ // Additional memory overhead - in mb.
+ protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
@@ -114,12 +115,11 @@ private[yarn] abstract class YarnAllocator(
// this is needed by alpha, do it here since we add numPending right after this
val executorsPending = numPendingAllocate.get()
-
if (missing > 0) {
+ val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo("Will Allocate %d executor containers, each with %d memory".format(
- missing,
- (executorMemory + memoryOverhead)))
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 0b712c2019..e1e0144f46 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -84,8 +84,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
- // Additional memory overhead - in mb.
- val DEFAULT_MEMORY_OVERHEAD = 384
+ // Additional memory overhead
+ // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering
+ // the common cases. Memory overhead tends to grow with container size.
+
+ val MEMORY_OVERHEAD_FACTOR = 0.07
+ val MEMORY_OVERHEAD_MIN = 384
val ANY_HOST = "*"