aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNishkam Ravi <nravi@cloudera.com>2014-10-02 13:48:35 -0500
committerThomas Graves <tgraves@apache.org>2014-10-02 13:48:35 -0500
commitb4fb7b80a0d863500943d788ad3e34d502a6dafa (patch)
tree2f2057289e9c37ae7579a01c4cdd65052bd348c4
parent82a6a083a485140858bcd93d73adec59bb5cca64 (diff)
downloadspark-b4fb7b80a0d863500943d788ad3e34d502a6dafa.tar.gz
spark-b4fb7b80a0d863500943d788ad3e34d502a6dafa.tar.bz2
spark-b4fb7b80a0d863500943d788ad3e34d502a6dafa.zip
Modify default YARN memory_overhead-- from an additive constant to a multiplier
Redone against the recent master branch (https://github.com/apache/spark/pull/1391) Author: Nishkam Ravi <nravi@cloudera.com> Author: nravi <nravi@c1704.halxg.cloudera.com> Author: nishkamravi2 <nishkamravi@gmail.com> Closes #2485 from nishkamravi2/master_nravi and squashes the following commits: 636a9ff [nishkamravi2] Update YarnAllocator.scala 8f76c8b [Nishkam Ravi] Doc change for yarn memory overhead 35daa64 [Nishkam Ravi] Slight change in the doc for yarn memory overhead 5ac2ec1 [Nishkam Ravi] Remove out dac1047 [Nishkam Ravi] Additional documentation for yarn memory overhead issue 42c2c3d [Nishkam Ravi] Additional changes for yarn memory overhead issue 362da5e [Nishkam Ravi] Additional changes for yarn memory overhead c726bd9 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi f00fa31 [Nishkam Ravi] Improving logging for AM memoryOverhead 1cf2d1e [nishkamravi2] Update YarnAllocator.scala ebcde10 [Nishkam Ravi] Modify default YARN memory_overhead-- from an additive constant to a multiplier (redone to resolve merge conflicts) 2e69f11 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi efd688a [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark 2b630f9 [nravi] Accept memory input as "30g", "512M" instead of an int value, to be consistent with rest of Spark 3bf8fad [nravi] Merge branch 'master' of https://github.com/apache/spark 5423a03 [nravi] Merge branch 'master' of https://github.com/apache/spark eb663ca [nravi] Merge branch 'master' of https://github.com/apache/spark df2aeb1 [nravi] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) 6b840f0 [nravi] Undo the fix for SPARK-1758 (the problem is fixed) 5108700 [nravi] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) 681b36f [nravi] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles
-rw-r--r--docs/running-on-yarn.md8
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala12
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala8
5 files changed, 35 insertions, 25 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4b3a49eca7..695813a2ba 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -79,16 +79,16 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
- <td>384</td>
+ <td>executorMemory * 0.07, with minimum of 384 </td>
<td>
- The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
+ The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
- <td>384</td>
+ <td>driverMemory * 0.07, with minimum of 384 </td>
<td>
- The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
+ The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
</td>
</tr>
<tr>
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 26dbd6237c..a12f82d2fb 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.{Utils, IntParam, MemoryParam}
-
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
@@ -39,15 +39,17 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var appName: String = "Spark"
var priority = 0
+ parseArgs(args.toList)
+ loadEnvironmentArgs()
+
// Additional memory to allocate to containers
// For now, use driver's memory overhead as our AM container's memory overhead
- val amMemoryOverhead = sparkConf.getInt(
- "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
- val executorMemoryOverhead = sparkConf.getInt(
- "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+ val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
+
+ val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
- parseArgs(args.toList)
- loadEnvironmentArgs()
validateArgs()
/** Load any default arguments provided through environment variables and Spark properties. */
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 1cf19c1985..6ecac6eae6 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -64,14 +64,18 @@ private[spark] trait ClientBase extends Logging {
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
- s"is above the max threshold ($maxMem MB) of this cluster!")
+ throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+ s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {
- throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
- s"is above the max threshold ($maxMem MB) of this cluster!")
+ throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+ s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
+ logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
+ amMem,
+ amMemoryOverhead))
+
// We could add checks to make sure the entire cluster has enough resources but that involves
// getting all the node reports and computing ourselves.
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 299e38a5eb..4f4f1d2aaa 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
object AllocationType extends Enumeration {
type AllocationType = Value
@@ -78,10 +79,6 @@ private[yarn] abstract class YarnAllocator(
// Containers to be released in next request to RM
private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
- // Additional memory overhead - in mb.
- protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
@@ -97,6 +94,10 @@ private[yarn] abstract class YarnAllocator(
protected val (preferredHostToCount, preferredRackToCount) =
generateNodeToWeight(conf, preferredNodes)
+ // Additional memory overhead - in mb.
+ protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
@@ -114,12 +115,11 @@ private[yarn] abstract class YarnAllocator(
// this is needed by alpha, do it here since we add numPending right after this
val executorsPending = numPendingAllocate.get()
-
if (missing > 0) {
+ val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
- logInfo("Will Allocate %d executor containers, each with %d memory".format(
- missing,
- (executorMemory + memoryOverhead)))
+ logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
+ s"memory including $memoryOverhead MB overhead")
} else {
logDebug("Empty allocation request ...")
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 0b712c2019..e1e0144f46 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -84,8 +84,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
- // Additional memory overhead - in mb.
- val DEFAULT_MEMORY_OVERHEAD = 384
+ // Additional memory overhead
+ // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering
+ // the common cases. Memory overhead tends to grow with container size.
+
+ val MEMORY_OVERHEAD_FACTOR = 0.07
+ val MEMORY_OVERHEAD_MIN = 384
val ANY_HOST = "*"