aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-11-05 15:42:05 -0800
committerAndrew Or <andrew@databricks.com>2014-11-05 15:42:05 -0800
commit61a5cced049a8056292ba94f23fa7bd040f50685 (patch)
tree179e7397473254090c47e295be842185e22f652d /core/src
parentf37817b18a479839b2e6118cc1cbd1059a94db52 (diff)
downloadspark-61a5cced049a8056292ba94f23fa7bd040f50685.tar.gz
spark-61a5cced049a8056292ba94f23fa7bd040f50685.tar.bz2
spark-61a5cced049a8056292ba94f23fa7bd040f50685.zip
[SPARK-3797] Run external shuffle service in Yarn NM
This creates a new module `network/yarn` that depends on `network/shuffle` recently created in #3001. This PR introduces a custom Yarn auxiliary service that runs the external shuffle service. As of the changes here this shuffle service is required for using dynamic allocation with Spark. This is still WIP mainly because it doesn't handle security yet. I have tested this on a stable Yarn cluster. Author: Andrew Or <andrew@databricks.com> Closes #3082 from andrewor14/yarn-shuffle-service and squashes the following commits: ef3ddae [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 0ee67a2 [Andrew Or] Minor wording suggestions 1c66046 [Andrew Or] Remove unused provided dependencies 0eb6233 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 6489db5 [Andrew Or] Try catch at the right places 7b71d8f [Andrew Or] Add detailed java docs + reword a few comments d1124e4 [Andrew Or] Add security to shuffle service (INCOMPLETE) 5f8a96f [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 9b6e058 [Andrew Or] Address various feedback f48b20c [Andrew Or] Fix tests again f39daa6 [Andrew Or] Do not make network-yarn an assembly module 761f58a [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-shuffle-service 15a5b37 [Andrew Or] Fix build for Hadoop 1.x baff916 [Andrew Or] Fix tests 5bf9b7e [Andrew Or] Address a few minor comments 5b419b8 [Andrew Or] Add missing license header 804e7ff [Andrew Or] Include the Yarn shuffle service jar in the distribution cd076a4 [Andrew Or] Require external shuffle service for dynamic allocation ea764e0 [Andrew Or] Connect to Yarn shuffle service only if it's enabled 1bf5109 [Andrew Or] Use the shuffle service port specified through hadoop config b4b1f0c [Andrew Or] 4 tabs -> 2 tabs 43dcb96 [Andrew Or] First cut integration of shuffle service with Yarn aux service b54a0c4 [Andrew Or] Initial skeleton for Yarn shuffle service
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala16
3 files changed, 49 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c11f1db006..ef93009a07 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
- verifyBounds()
// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
// How long an executor must be idle for before it is removed
- private val removeThresholdSeconds = conf.getLong(
+ private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)
+ // During testing, the methods to actually kill and add executors are mocked out
+ private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
+
+ validateSettings()
+
// Number of executors to add in the next round
private var numExecutorsToAdd = 1
@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Polling loop interval (ms)
private val intervalMillis: Long = 100
- // Whether we are testing this class. This should only be used internally.
- private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
-
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock
/**
- * Verify that the lower and upper bounds on the number of executors are valid.
+ * Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
- private def verifyBounds(): Unit = {
+ private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
}
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
+ if (schedulerBacklogTimeout <= 0) {
+ throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
+ }
+ if (sustainedSchedulerBacklogTimeout <= 0) {
+ throw new SparkException(
+ "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
+ }
+ if (executorIdleTimeout <= 0) {
+ throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
+ }
+ // Require external shuffle service for dynamic allocation
+ // Otherwise, we may lose shuffle files when killing executors
+ if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
+ throw new SparkException("Dynamic allocation of executors requires the external " +
+ "shuffle service. You may enable this through spark.shuffle.service.enabled.")
+ }
}
/**
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
- s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
+ s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
- s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
- removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
+ s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
+ removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a5fb87b9b2..e48d7772d6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,7 +40,6 @@ import org.apache.spark.network.util.{ConfigProvider, TransportConf}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.util._
private[spark] sealed trait BlockValues
@@ -97,7 +96,12 @@ private[spark] class BlockManager(
private[spark]
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
- private val externalShuffleServicePort = conf.getInt("spark.shuffle.service.port", 7337)
+
+ // Port used by the external shuffle service. In Yarn mode, this may be already be
+ // set through the Hadoop configuration as the server is launched in the Yarn NM.
+ private val externalShuffleServicePort =
+ Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+
// Check that we're not using external shuffle service with consolidated shuffle files.
if (externalShuffleServiceEnabled
&& conf.getBoolean("spark.shuffle.consolidateFiles", false)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6ab94af9f3..7caf6bcf94 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -45,6 +45,7 @@ import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/** CallSite represents a place in user code. It can have a short and a long form. */
@@ -1780,6 +1781,21 @@ private[spark] object Utils extends Logging {
val manifest = new JarManifest(manifestUrl.openStream())
manifest.getMainAttributes.getValue(Name.IMPLEMENTATION_VERSION)
}.getOrElse("Unknown")
+
+ /**
+ * Return the value of a config either through the SparkConf or the Hadoop configuration
+ * if this is Yarn mode. In the latter case, this defaults to the value set through SparkConf
+ * if the key is not set in the Hadoop configuration.
+ */
+ def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = {
+ val sparkValue = conf.get(key, default)
+ if (SparkHadoopUtil.get.isYarnMode) {
+ SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue)
+ } else {
+ sparkValue
+ }
+ }
+
}
/**