diff options
author | Y.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-08-30 15:55:32 -0500 |
---|---|---|
committer | Y.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-08-30 15:55:32 -0500 |
commit | bac46266a97a6096d6d772e023a3362fd48baac0 (patch) | |
tree | 26ec4fc4098b3c34a9277731c6cd7e3769b9b1e2 /yarn/src/main | |
parent | 94bb7fd46e5586e1d08a99d21eecef93eeb4b97c (diff) | |
download | spark-bac46266a97a6096d6d772e023a3362fd48baac0.tar.gz spark-bac46266a97a6096d6d772e023a3362fd48baac0.tar.bz2 spark-bac46266a97a6096d6d772e023a3362fd48baac0.zip |
Link the Spark UI to the Yarn UI
Diffstat (limited to 'yarn/src/main')
3 files changed, 57 insertions, 18 deletions
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 0f3b6bc1a6..d6acb080cc 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -45,6 +45,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var yarnAllocator: YarnAllocationHandler = null private var isFinished:Boolean = false + private var uiAddress: String = "" + def run() { // setup the directories so things go to yarn approved directories rather @@ -53,21 +55,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() - val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() + // TODO: Uncomment when hadoop is on a version which has this fixed. // Compute number of threads for akka - val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() + //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - if (minimumMemory > 0) { - val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) + //if (minimumMemory > 0) { + // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - if (numCore > 0) { + // if (numCore > 0) { // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 // TODO: Uncomment when hadoop is on a version which has this fixed. // args.workerCores = numCore - } - } + // } + //} // Workaround until hadoop moves to something which has // https://issues.apache.org/jira/browse/HADOOP-8406 @@ -83,6 +85,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. waitForSparkMaster() + + waitForSparkContextInitialized() + + // do this after spark master is up and SparkContext is created so that we can register UI Url + val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() // Allocate all containers allocateWorkers() @@ -134,8 +141,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Users can then monitor stderr/stdout on that node if required. appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) - // What do we provide here ? Might make sense to expose something sensible later ? - appMasterRequest.setTrackingUrl("") + appMasterRequest.setTrackingUrl(uiAddress) return resourceManager.registerApplicationMaster(appMasterRequest) } @@ -143,7 +149,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("Waiting for spark driver to be reachable.") var driverUp = false var tries = 0 - while(!driverUp && tries < 10) { + val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt + while(!driverUp && tries < numTries) { val driverHost = System.getProperty("spark.driver.host") val driverPort = System.getProperty("spark.driver.port") try { @@ -189,24 +196,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e return t } - private def allocateWorkers() { + // this need to happen before allocateWorkers + private def waitForSparkContextInitialized() { logInfo("Waiting for spark context initialization") - try { var sparkContext: SparkContext = null ApplicationMaster.sparkContextRef.synchronized { var count = 0 - while (ApplicationMaster.sparkContextRef.get() == null && count < 10) { + val waitTime = 10000L + val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt + while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 - ApplicationMaster.sparkContextRef.wait(10000L) + ApplicationMaster.sparkContextRef.wait(waitTime) } sparkContext = ApplicationMaster.sparkContextRef.get() - assert(sparkContext != null) - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData) + assert(sparkContext != null || count >= numTries) + + if (null != sparkContext) { + uiAddress = sparkContext.ui.appUIAddress + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, + sparkContext.preferredNodeLocationData) + } else { + logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + + ", numTries = " + numTries) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) + } } + } finally { + // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : + // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks + ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) + } + } + + private def allocateWorkers() { + try { logInfo("Allocating " + args.numWorkers + " workers.") // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? @@ -298,6 +325,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) + // set tracking url to empty since we don't have a history server + finishReq.setTrackingUrl("") resourceManager.finishApplicationMaster(finishReq) } diff --git a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala index b0af8baf08..1f235cef88 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala @@ -479,6 +479,15 @@ object YarnAllocationHandler { private val hostToRack = new ConcurrentHashMap[String, String]() private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() + + def newAllocator(conf: Configuration, + resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, + args: ApplicationMasterArguments): YarnAllocationHandler = { + + new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, + args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]()) + } + def newAllocator(conf: Configuration, resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, @@ -486,7 +495,6 @@ object YarnAllocationHandler { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) - new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, args.workerMemory, args.workerCores, hostToCount, rackToCount) } diff --git a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala index bb58353e0c..58a3f4043a 100644 --- a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration */ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { + logInfo("Created YarnClusterScheduler") + def this(sc: SparkContext) = this(sc, new Configuration()) // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate |