aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorNishkam Ravi <nravi@cloudera.com>2014-10-02 13:48:35 -0500
committerThomas Graves <tgraves@apache.org>2014-10-02 13:48:35 -0500
commitb4fb7b80a0d863500943d788ad3e34d502a6dafa (patch)
tree2f2057289e9c37ae7579a01c4cdd65052bd348c4 /yarn
parent82a6a083a485140858bcd93d73adec59bb5cca64 (diff)
downloadspark-b4fb7b80a0d863500943d788ad3e34d502a6dafa.tar.gz
spark-b4fb7b80a0d863500943d788ad3e34d502a6dafa.tar.bz2
spark-b4fb7b80a0d863500943d788ad3e34d502a6dafa.zip
Modify default YARN memory_overhead-- from an additive constant to a multiplier
Redone against the recent master branch (https://github.com/apache/spark/pull/1391) Author: Nishkam Ravi <nravi@cloudera.com> Author: nravi <nravi@c1704.halxg.cloudera.com> Author: nishkamravi2 <nishkamravi@gmail.com> Closes #2485 from nishkamravi2/master_nravi and squashes the following commits: 636a9ff [nishkamravi2] Update YarnAllocator.scala 8f76c8b [Nishkam Ravi] Doc change for yarn memory overhead 35daa64 [Nishkam Ravi] Slight change in the doc for yarn memory overhead 5ac2ec1 [Nishkam Ravi] Remove out dac1047 [Nishkam Ravi] Additional documentation for yarn memory overhead issue 42c2c3d [Nishkam Ravi] Additional changes for yarn memory overhead issue 362da5e [Nishkam Ravi] Additional changes for yarn memory overhead c726bd9 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi f00fa31 [Nishkam Ravi] Improving logging for AM memoryOverhead 1cf2d1e [nishkamravi2] Update YarnAllocator.scala ebcde10 [Nishkam Ravi] Modify default YARN memory_overhead-- from an additive constant to a multiplier (redone to resolve merge conflicts) 2e69f11 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi efd688a [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark 2b630f9 [nravi] Accept memory input as "30g", "512M" instead of an int value, to be consistent with rest of Spark 3bf8fad [nravi] Merge branch 'master' of https://github.com/apache/spark 5423a03 [nravi] Merge branch 'master' of https://github.com/apache/spark eb663ca [nravi] Merge branch 'master' of https://github.com/apache/spark df2aeb1 [nravi] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) 6b840f0 [nravi] Undo the fix for SPARK-1758 (the problem is fixed) 5108700 [nravi] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) 681b36f [nravi] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala8
4 files changed, 31 insertions, 21 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 26dbd6237c..a12f82d2fb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.{Utils, IntParam, MemoryParam}
-
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -39,15 +39,17 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var appName: String = "Spark"
var priority = 0
+ parseArgs(args.toList)
+ loadEnvironmentArgs()
+
// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
- val amMemoryOverhead = sparkConf.getInt(
- "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
- val executorMemoryOverhead = sparkConf.getInt(
- "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+ val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+ val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
- parseArgs(args.toList)
- loadEnvironmentArgs()
validateArgs()
/** Load any default arguments provided through environment variables and Spark properties. */
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 1cf19c1985..6ecac6eae6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -64,14 +64,18 @@ private[spark] trait ClientBase extends Logging {
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
- s"is above the max threshold ($maxMem MB) of this cluster!")
+ throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+ s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {
- throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
- s"is above the max threshold ($maxMem MB) of this cluster!")
+ throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+ s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
+ logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
+ amMem,
+ amMemoryOverhead))
+
// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 299e38a5eb..4f4f1d2aaa 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
object AllocationType extends Enumeration {
type AllocationType = Value
@@ -78,10 +79,6 @@ private[yarn] abstract class YarnAllocator(
// Containers to be released in next request to RM
private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
- // Additional memory overhead - in mb.
- protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
@@ -97,6 +94,10 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)
+ // Additional memory overhead - in mb.
+ protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
@@ -114,12 +115,11 @@ private[yarn] abstract class YarnAllocator(
// this is needed by alpha, do it here since we add numPending right after this
val executorsPending = numPendingAllocate.get()
-
if (missing > 0) {
+ val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo("Will Allocate %d executor containers, each with %d memory".format(
- missing,
- (executorMemory + memoryOverhead)))
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 0b712c2019..e1e0144f46 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -84,8 +84,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
- // Additional memory overhead - in mb.
- val DEFAULT_MEMORY_OVERHEAD = 384
+ // Additional memory overhead
+ // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering
+ // the common cases. Memory overhead tends to grow with container size.
+
+ val MEMORY_OVERHEAD_FACTOR = 0.07
+ val MEMORY_OVERHEAD_MIN = 384
val ANY_HOST = "*"