aboutsummaryrefslogtreecommitdiff
path: root/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
diff options
context:
space:
mode:
Diffstat (limited to 'new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala')
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala52
1 files changed, 17 insertions, 35 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 91e35e2d34..7c32e0ab9b 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.util.Utils
@@ -60,14 +60,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var isLastAMRetry: Boolean = true
private var amClient: AMRMClient[ContainerRequest] = _
+ private val sparkConf = new SparkConf()
// Default to numWorkers * 2, with minimum of 3
- private val maxNumWorkerFailures = conf.getOrElse("spark.yarn.max.worker.failures",
- math.max(args.numWorkers * 2, 3).toString()).toInt
+ private val maxNumWorkerFailures = sparkConf.getInt("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3))
def run() {
// Setup the directories so things go to YARN approved directories rather
// than user specified and /tmp.
- conf.set("spark.local.dir", getLocalDirs())
+ System.setProperty("spark.local.dir", getLocalDirs())
+
+ // set the web ui port to be ephemeral for yarn so we don't conflict with
+ // other spark processes running on the same box
+ System.setProperty("spark.ui.port", "0")
// Use priority 30 as it's higher then HDFS. It's same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
@@ -89,8 +94,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
- waitForSparkMaster()
-
waitForSparkContextInitialized()
// Do this after Spark master is up and SparkContext is created so that we can register UI Url.
@@ -134,30 +137,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
}
- private def waitForSparkMaster() {
- logInfo("Waiting for Spark driver to be reachable.")
- var driverUp = false
- var tries = 0
- val numTries = conf.getOrElse("spark.yarn.applicationMaster.waitTries", "10").toInt
- while (!driverUp && tries < numTries) {
- val driverHost = conf.get("spark.driver.host")
- val driverPort = conf.get("spark.driver.port")
- try {
- val socket = new Socket(driverHost, driverPort.toInt)
- socket.close()
- logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
- driverUp = true
- } catch {
- case e: Exception => {
- logWarning("Failed to connect to driver at %s:%s, retrying ...".
- format(driverHost, driverPort))
- Thread.sleep(100)
- tries = tries + 1
- }
- }
- }
- }
-
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(
@@ -199,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ApplicationMaster.sparkContextRef.synchronized {
var numTries = 0
val waitTime = 10000L
- val maxNumTries = conf.getOrElse("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ val maxNumTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
while (ApplicationMaster.sparkContextRef.get() == null && numTries < maxNumTries) {
logInfo("Waiting for Spark context initialization ... " + numTries)
numTries = numTries + 1
@@ -215,7 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
amClient,
appAttemptId,
args,
- sparkContext.preferredNodeLocationData)
+ sparkContext.preferredNodeLocationData,
+ sparkContext.getConf)
} else {
logWarning("Unable to retrieve SparkContext inspite of waiting for %d, maxNumTries = %d".
format(numTries * waitTime, maxNumTries))
@@ -223,7 +203,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
yarnConf,
amClient,
appAttemptId,
- args)
+ args,
+ sparkContext.getConf)
}
}
} finally {
@@ -265,7 +246,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval =
- conf.getOrElse("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+ sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
+
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
@@ -343,7 +325,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private def cleanupStagingDir() {
var stagingDirPath: Path = null
try {
- val preserveFiles = conf.getOrElse("spark.yarn.preserve.staging.files", "false").toBoolean
+ val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
if (stagingDirPath == null) {