aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-06-11 07:57:28 -0500
committerThomas Graves <tgraves@apache.org>2014-06-11 07:57:28 -0500
commit2a4225dd944441d3f735625bb6bae6fca8fd06ca (patch)
treec539c35c9eb73bc757f9e2d1b1213844e6dd5dd6
parent6e11930310e3864790d0e30f0df7bf691cbeb85d (diff)
downloadspark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.gz
spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.tar.bz2
spark-2a4225dd944441d3f735625bb6bae6fca8fd06ca.zip
SPARK-1639. Tidy up some Spark on YARN code
This contains a bunch of small tidyings of the Spark on YARN code. I focused on the yarn stable code. @tgravescs, let me know if you'd like me to make these for the alpha code as well. Author: Sandy Ryza <sandy@cloudera.com> Closes #561 from sryza/sandy-spark-1639 and squashes the following commits: 72b6a02 [Sandy Ryza] Fix comment and set name on driver thread c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala16
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala38
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala28
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala10
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala197
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala10
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala40
7 files changed, 161 insertions, 178 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8f0ecb8557..1cc9c33cd2 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
yarnAllocator.allocateContainers(
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
+ Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
@@ -416,6 +416,7 @@ object ApplicationMaster {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+ private val ALLOCATE_HEARTBEAT_INTERVAL = 100
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
@@ -467,13 +468,22 @@ object ApplicationMaster {
})
}
- // Wait for initialization to complete and atleast 'some' nodes can get allocated.
+ modified
+ }
+
+
+ /**
+ * Returns when we've either
+ * 1) received all the requested executors,
+ * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+ * 3) hit an error that causes us to terminate trying to get containers.
+ */
+ def waitForInitialAllocations() {
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
- modified
}
def main(argStrings: Array[String]) {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 801e8b3815..29a35680c0 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
-import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -37,7 +36,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf, SparkContext}
/**
@@ -169,14 +168,13 @@ trait ClientBase extends Logging {
destPath
}
- def qualifyForLocal(localURI: URI): Path = {
+ private def qualifyForLocal(localURI: URI): Path = {
var qualifiedURI = localURI
- // If not specified assume these are in the local filesystem to keep behavior like Hadoop
+ // If not specified, assume these are in the local filesystem to keep behavior like Hadoop
if (qualifiedURI.getScheme() == null) {
qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
}
- val qualPath = new Path(qualifiedURI)
- qualPath
+ new Path(qualifiedURI)
}
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
@@ -305,13 +303,13 @@ trait ClientBase extends Logging {
val amMemory = calculateAMMemory(newApp)
- val JAVA_OPTS = ListBuffer[String]()
+ val javaOpts = ListBuffer[String]()
// Add Xmx for AM memory
- JAVA_OPTS += "-Xmx" + amMemory + "m"
+ javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
- JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
+ javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
// The context is, default gc for server class machines ends up using all cores to do gc -
@@ -325,11 +323,11 @@ trait ClientBase extends Logging {
if (useConcurrentAndIncrementalGC) {
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
- JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
- JAVA_OPTS += "-XX:+CMSIncrementalMode"
- JAVA_OPTS += "-XX:+CMSIncrementalPacing"
- JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
- JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
+ javaOpts += "-XX:+UseConcMarkSweepGC"
+ javaOpts += "-XX:+CMSIncrementalMode"
+ javaOpts += "-XX:+CMSIncrementalPacing"
+ javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+ javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
// SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
@@ -344,22 +342,22 @@ trait ClientBase extends Logging {
// If we are being launched in client mode, forward the spark-conf options
// onto the executor launcher
for ((k, v) <- sparkConf.getAll) {
- JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+ javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
} else {
// If we are being launched in standalone mode, capture and forward any spark
// system properties (e.g. set by spark-class).
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
- JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
+ javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
- sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
- sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
+ sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
+ sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
}
- JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+ javaOpts += ClientBase.getLog4jConfiguration(localResources)
// Command for the ApplicationMaster
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
- JAVA_OPTS ++
+ javaOpts ++
Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
userArgsToString(args),
"--executor-memory", args.executorMemory.toString,
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 32f8861dc9..43dbb2464f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{Logging, SparkConf}
@@ -46,19 +46,19 @@ trait ExecutorRunnableUtil extends Logging {
executorCores: Int,
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
- val JAVA_OPTS = ListBuffer[String]()
+ val javaOpts = ListBuffer[String]()
// Set the JVM memory
val executorMemoryString = executorMemory + "m"
- JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
+ javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
// Set extra Java options for the executor, if defined
sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
- JAVA_OPTS += opts
+ javaOpts += opts
}
- JAVA_OPTS += "-Djava.io.tmpdir=" +
+ javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
- JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
+ javaOpts += ClientBase.getLog4jConfiguration(localResources)
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -66,10 +66,10 @@ trait ExecutorRunnableUtil extends Logging {
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
- foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+ foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
sparkConf.getAkkaConf.
- foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
+ foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
@@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging {
// multi-tennent machines
// The options are based on
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
+ javaOpts += " -XX:+UseConcMarkSweepGC "
+ javaOpts += " -XX:+CMSIncrementalMode "
+ javaOpts += " -XX:+CMSIncrementalPacing "
+ javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
+ javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
}
*/
@@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging {
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
"-XX:OnOutOfMemoryError='kill %p'") ++
- JAVA_OPTS ++
+ javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
masterAddress.toString,
slaveId.toString,
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index a4638cc863..39cdd2e8a5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
def this(sc: SparkContext) = this(sc, new Configuration())
- // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
- // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
- // Subsequent creations are ignored - since nodes are already allocated by then.
-
+ // Nothing else for now ... initialize application master : which needs a SparkContext to
+ // determine how to allocate.
+ // Note that only the first creation of a SparkContext influences (and ideally, there must be
+ // only one SparkContext, right ?). Subsequent creations are ignored since executors are already
+ // allocated by then.
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
@@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
if (sparkContextInitialized){
+ ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
Thread.sleep(3000L)
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 33a60d978c..6244332f23 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -19,13 +19,12 @@ package org.apache.spark.deploy.yarn
import java.io.IOException
import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -33,8 +32,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
@@ -77,17 +75,18 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// than user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())
- // set the web ui port to be ephemeral for yarn so we don't conflict with
+ // Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
- // when running the AM, the Spark master is always "yarn-cluster"
+ // When running the AM, the Spark master is always "yarn-cluster"
System.setProperty("spark.master", "yarn-cluster")
- // Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
+ // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
- appAttemptId = getApplicationAttemptId()
+ appAttemptId = ApplicationMaster.getApplicationAttemptId()
+ logInfo("ApplicationAttemptId: " + appAttemptId)
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
@@ -99,7 +98,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
ApplicationMaster.register(this)
// Call this to force generation of secret so it gets populated into the
- // hadoop UGI. This has to happen before the startUserClass which does a
+ // Hadoop UGI. This has to happen before the startUserClass which does a
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
@@ -121,7 +120,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// Allocate all containers
allocateExecutors()
- // Wait for the user class to Finish
+ // Launch thread that will heartbeat to the RM so it won't think the app has died.
+ launchReporterThread()
+
+ // Wait for the user class to finish
userThread.join()
System.exit(0)
@@ -141,7 +143,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
"spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}
- /** Get the Yarn approved local directories. */
+ // Get the Yarn approved local directories.
private def getLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
@@ -150,18 +152,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
.orElse(Option(System.getenv("LOCAL_DIRS")))
localDirs match {
- case None => throw new Exception("Yarn Local dirs can't be empty")
+ case None => throw new Exception("Yarn local dirs can't be empty")
case Some(l) => l
}
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
}
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
@@ -173,25 +166,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(
args.userClass,
- false /* initialize */ ,
+ false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
-
- var successed = false
+ var succeeded = false
try {
// Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+ val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
- // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
- // userThread will stop here unless it has uncaught exception thrown out
- // It need shutdown hook to set SUCCEEDED
- successed = true
+ // Some apps have "System.exit(0)" at the end. The user thread will stop here unless
+ // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
+ succeeded = true
} finally {
- logDebug("finishing main")
+ logDebug("Finishing main")
isLastAMRetry = true
- if (successed) {
+ if (succeeded) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
@@ -199,11 +190,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
}
+ t.setName("Driver")
t.start()
t
}
- // This need to happen before allocateExecutors()
+ // This needs to happen before allocateExecutors()
private def waitForSparkContextInitialized() {
logInfo("Waiting for Spark context initialization")
try {
@@ -231,7 +223,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkContext.preferredNodeLocationData,
sparkContext.getConf)
} else {
- logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
+ logWarning("Unable to retrieve SparkContext in spite of waiting for %d, maxNumTries = %d".
format(numTries * waitTime, maxNumTries))
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
@@ -242,48 +234,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
} finally {
- // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks.
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ // In case of exceptions, etc - ensure that the loop in
+ // ApplicationMaster#sparkContextInitialized() breaks.
+ ApplicationMaster.doneWithInitialAllocations()
}
}
private def allocateExecutors() {
try {
- logInfo("Allocating " + args.numExecutors + " executors.")
- // Wait until all containers have finished
+ logInfo("Requesting" + args.numExecutors + " executors.")
+ // Wait until all containers have launched
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
+
+ var iters = 0
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
- ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
+ if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
+ ApplicationMaster.doneWithInitialAllocations()
+ }
+ Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
+ iters += 1
}
} finally {
- // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
- // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ // In case of exceptions, etc - ensure that the loop in
+ // ApplicationMaster#sparkContextInitialized() breaks.
+ ApplicationMaster.doneWithInitialAllocations()
}
logInfo("All executors have launched.")
-
- // Launch a progress reporter thread, else the app will get killed after expiration
- // (def: 10mins) timeout.
- if (userThread.isAlive) {
- // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
- // we want to be reasonably responsive without causing too many requests to RM.
- val schedulerInterval =
- sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
-
- // must be <= timeoutInterval / 2.
- val interval = math.min(timeoutInterval / 2, schedulerInterval)
-
- launchReporterThread(interval)
- }
}
private def allocateMissingExecutor() {
@@ -303,47 +284,35 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
+ private def launchReporterThread(): Thread = {
+ // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+ val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
val t = new Thread {
override def run() {
while (userThread.isAlive) {
checkNumExecutorsFailed()
allocateMissingExecutor()
- sendProgress()
- Thread.sleep(sleepTime)
+ logDebug("Sending progress")
+ yarnAllocator.allocateResources()
+ Thread.sleep(interval)
}
}
}
// Setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
+ logInfo("Started progress reporter thread - heartbeat interval : " + interval)
t
}
- private def sendProgress() {
- logDebug("Sending progress")
- // Simulated with an allocate request with no nodes requested.
- yarnAllocator.allocateResources()
- }
-
- /*
- def printContainers(containers: List[Container]) = {
- for (container <- containers) {
- logInfo("Launching shell command on a new container."
- + ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
- + ":" + container.getNodeId().getPort()
- + ", containerNodeURI=" + container.getNodeHttpAddress()
- + ", containerState" + container.getState()
- + ", containerResourceMemory"
- + container.getResource().getMemory())
- }
- }
- */
-
def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized {
if (isFinished) {
@@ -351,7 +320,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
isFinished = true
- logInfo("finishApplicationMaster with " + status)
+ logInfo("Unregistering ApplicationMaster with " + status)
if (registered) {
val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
amClient.unregisterApplicationMaster(status, diagnostics, trackingUrl)
@@ -386,7 +355,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
def run() {
logInfo("AppMaster received a signal.")
- // we need to clean up staging dir before HDFS is shut down
+ // We need to clean up staging dir before HDFS is shut down
// make sure we don't delete it until this is the last AM
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
@@ -401,21 +370,24 @@ object ApplicationMaster {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+ private val ALLOCATE_HEARTBEAT_INTERVAL = 100
private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
val sparkContextRef: AtomicReference[SparkContext] =
- new AtomicReference[SparkContext](null /* initialValue */)
+ new AtomicReference[SparkContext](null)
- val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
+ // Variable used to notify the YarnClusterScheduler that it should stop waiting
+ // for the initial set of executors to be started and get on with its business.
+ val doneWithInitialAllocationsMonitor = new Object()
- def incrementAllocatorLoop(by: Int) {
- val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.synchronized {
- // to wake threads off wait ...
- yarnAllocatorLoop.notifyAll()
- }
+ @volatile var isDoneWithInitialAllocations = false
+
+ def doneWithInitialAllocations() {
+ isDoneWithInitialAllocations = true
+ doneWithInitialAllocationsMonitor.synchronized {
+ // to wake threads off wait ...
+ doneWithInitialAllocationsMonitor.notifyAll()
}
}
@@ -423,7 +395,10 @@ object ApplicationMaster {
applicationMasters.add(master)
}
- // TODO(harvey): See whether this should be discarded - it isn't used anywhere atm...
+ /**
+ * Called from YarnClusterScheduler to notify the AM code that a SparkContext has been
+ * initialized in the user code.
+ */
def sparkContextInitialized(sc: SparkContext): Boolean = {
var modified = false
sparkContextRef.synchronized {
@@ -431,7 +406,7 @@ object ApplicationMaster {
sparkContextRef.notifyAll()
}
- // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
+ // Add a shutdown hook - as a best effort in case users do not call sc.stop or do
// System.exit.
// Should not really have to do this, but it helps YARN to evict resources earlier.
// Not to mention, prevent the Client from declaring failure even though we exited properly.
@@ -454,13 +429,29 @@ object ApplicationMaster {
})
}
- // Wait for initialization to complete and atleast 'some' nodes can get allocated.
- yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.wait(1000L)
+ // Wait for initialization to complete and at least 'some' nodes to get allocated.
+ modified
+ }
+
+ /**
+ * Returns when we've either
+ * 1) received all the requested executors,
+ * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
+ * 3) hit an error that causes us to terminate trying to get containers.
+ */
+ def waitForInitialAllocations() {
+ doneWithInitialAllocationsMonitor.synchronized {
+ while (!isDoneWithInitialAllocations) {
+ doneWithInitialAllocationsMonitor.wait(1000L)
}
}
- modified
+ }
+
+ def getApplicationAttemptId(): ApplicationAttemptId = {
+ val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ val appAttemptId = containerId.getApplicationAttemptId()
+ appAttemptId
}
def main(argStrings: Array[String]) {
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 393edd1f2d..24027618c1 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,14 +21,12 @@ import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SparkConf}
@@ -102,7 +100,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def logClusterResourceDetails() {
val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
- logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
+ logInfo("Got Cluster metric info from ResourceManager, number of NodeManagers: " +
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
@@ -133,7 +131,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
def submitApp(appContext: ApplicationSubmissionContext) = {
// Submit the application to the applications manager.
- logInfo("Submitting application to ASM")
+ logInfo("Submitting application to ResourceManager")
yarnClient.submitApplication(appContext)
}
@@ -149,7 +147,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
Thread.sleep(interval)
val report = yarnClient.getApplicationReport(appId)
- logInfo("Application report from ASM: \n" +
+ logInfo("Application report from ResourceManager: \n" +
"\t application identifier: " + appId.toString() + "\n" +
"\t appId: " + appId.getId() + "\n" +
"\t clientToAMToken: " + report.getClientToAMToken() + "\n" +
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index d93e5bb022..4f8854a09a 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -72,8 +72,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- // Send a hello message thus the connection is actually established,
- // thus we can monitor Lifecycle Events.
+ // Send a hello message to establish the connection, after which
+ // we can monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -95,7 +95,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
amClient.init(yarnConf)
amClient.start()
- appAttemptId = getApplicationAttemptId()
+ appAttemptId = ApplicationMaster.getApplicationAttemptId()
registerApplicationMaster()
waitForSparkMaster()
@@ -141,18 +141,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- appAttemptId
- }
-
private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
- // TODO:(Raymond) Find out Spark UI address and fill in here?
+ // TODO: Find out client's Spark UI address and fill in here?
amClient.registerApplicationMaster(Utils.localHostName(), 0, "")
}
@@ -185,8 +176,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
private def allocateExecutors() {
-
- // Fixme: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
+ // TODO: should get preferredNodeLocationData from SparkContext, just fake a empty one for now.
val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
scala.collection.immutable.Map()
@@ -198,8 +188,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
preferredNodeLocationData,
sparkConf)
- logInfo("Allocating " + args.numExecutors + " executors.")
- // Wait until all containers have finished
+ logInfo("Requesting " + args.numExecutors + " executors.")
+ // Wait until all containers have launched
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
@@ -221,7 +211,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
}
}
- // TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
@@ -229,7 +218,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def run() {
while (!driverClosed) {
allocateMissingExecutor()
- sendProgress()
+ logDebug("Sending progress")
+ yarnAllocator.allocateResources()
Thread.sleep(sleepTime)
}
}
@@ -241,20 +231,14 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
t
}
- private def sendProgress() {
- logDebug("Sending progress")
- // simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateResources()
- }
-
def finishApplicationMaster(status: FinalApplicationStatus) {
- logInfo("finish ApplicationMaster with " + status)
- amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
+ logInfo("Unregistering ApplicationMaster with " + status)
+ val trackingUrl = sparkConf.get("spark.yarn.historyServer.address", "")
+ amClient.unregisterApplicationMaster(status, "" /* appMessage */ , trackingUrl)
}
}
-
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)