aboutsummaryrefslogtreecommitdiff
path: root/yarn/common
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-06-11 07:57:28 -0500
committerThomas Graves <tgraves@apache.org>2014-06-11 07:57:28 -0500
commit2a4225dd944441d3f735625bb6bae6fca8fd06ca (patch)
treec539c35c9eb73bc757f9e2d1b1213844e6dd5dd6 /yarn/common
parent6e11930310e3864790d0e30f0df7bf691cbeb85d (diff)
downloadspark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.gz
spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.bz2
spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.zip
SPARK-1639. Tidy up some Spark on YARN code
This contains a bunch of small tidyings of the Spark on YARN code. I focused on the yarn stable code. @tgravescs, let me know if you'd like me to make these for the alpha code as well. Author: Sandy Ryza <sandy@cloudera.com> Closes #561 from sryza/sandy-spark-1639 and squashes the following commits: 72b6a02 [Sandy Ryza] Fix comment and set name on driver thread c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code
Diffstat (limited to 'yarn/common')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala38
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala28
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala10
3 files changed, 38 insertions, 38 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 801e8b3815..29a35680c0 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
-import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -37,7 +36,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf, SparkContext}
/**
@@ -169,14 +168,13 @@ trait ClientBase extends Logging {
destPath
}
- def qualifyForLocal(localURI: URI): Path = {
+ private def qualifyForLocal(localURI: URI): Path = {
var qualifiedURI = localURI
- // If not specified assume these are in the local filesystem to keep behavior like Hadoop
+ // If not specified, assume these are in the local filesystem to keep behavior like Hadoop
if (qualifiedURI.getScheme() == null) {
qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
}
- val qualPath = new Path(qualifiedURI)
- qualPath
+ new Path(qualifiedURI)
}
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
@@ -305,13 +303,13 @@ trait ClientBase extends Logging {
val amMemory = calculateAMMemory(newApp)
- val JAVA_OPTS = ListBuffer[String]()
+ val javaOpts = ListBuffer[String]()
// Add Xmx for AM memory
- JAVA_OPTS += "-Xmx" + amMemory + "m"
+ javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
- JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
+ javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
@@ -325,11 +323,11 @@ trait ClientBase extends Logging {
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
- JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
- JAVA_OPTS += "-XX:+CMSIncrementalMode"
- JAVA_OPTS += "-XX:+CMSIncrementalPacing"
- JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
- JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
+ javaOpts += "-XX:+UseConcMarkSweepGC"
+ javaOpts += "-XX:+CMSIncrementalMode"
+ javaOpts += "-XX:+CMSIncrementalPacing"
+ javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+ javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
// SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
@@ -344,22 +342,22 @@ trait ClientBase extends Logging {
// If we are being launched in client mode, forward the spark-conf options
// onto the executor launcher
for ((k, v) <- sparkConf.getAll) {
- JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+ javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
} else {
// If we are being launched in standalone mode, capture and forward any spark
// system properties (e.g. set by spark-class).
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
- JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+ javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
- sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
- sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
+ sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
+ sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
}
- JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+ javaOpts += ClientBase.getLog4jConfiguration(localResources)
// Command for the ApplicationMaster
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
- JAVA_OPTS ++
+ javaOpts ++
Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
userArgsToString(args),
"--executor-memory", args.executorMemory.toString,
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 32f8861dc9..43dbb2464f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{Logging, SparkConf}
@@ -46,19 +46,19 @@ trait ExecutorRunnableUtil extends Logging {
executorCores: Int,
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
- val JAVA_OPTS = ListBuffer[String]()
+ val javaOpts = ListBuffer[String]()
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
- JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
+ javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
// Set extra Java options for the executor, if defined
sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
- JAVA_OPTS += opts
+ javaOpts += opts
}
- JAVA_OPTS += "-Djava.io.tmpdir=" +
+ javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
- JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+ javaOpts += ClientBase.getLog4jConfiguration(localResources)
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -66,10 +66,10 @@ trait ExecutorRunnableUtil extends Logging {
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
- foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+ foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
sparkConf.getAkkaConf.
- foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+ foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
@@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging {
// multi-tennent machines
// The options are based on
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ javaOpts += " -XX:+UseConcMarkSweepGC "
+ javaOpts += " -XX:+CMSIncrementalMode "
+ javaOpts += " -XX:+CMSIncrementalPacing "
+ javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+ javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
}
*/
@@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging {
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
"-XX:OnOutOfMemoryError='kill %p'") ++
- JAVA_OPTS ++
+ javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
masterAddress.toString,
slaveId.toString,
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index a4638cc863..39cdd2e8a5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
def this(sc: SparkContext) = this(sc, new Configuration())
- // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
- // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
- // Subsequent creations are ignored - since nodes are already allocated by then.
-
+ // Nothing else for now ... initialize application master : which needs a SparkContext to
+ // determine how to allocate.
+ // Note that only the first creation of a SparkContext influences (and ideally, there must be
+ // only one SparkContext, right ?). Subsequent creations are ignored since executors are already
+ // allocated by then.
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
@@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
if (sparkContextInitialized){
+ ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(3000L)
}