aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-08-30 15:55:32 -0500
committerY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-08-30 15:55:32 -0500
commitbac46266a97a6096d6d772e023a3362fd48baac0 (patch)
tree26ec4fc4098b3c34a9277731c6cd7e3769b9b1e2 /yarn
parent94bb7fd46e5586e1d08a99d21eecef93eeb4b97c (diff)
downloadspark-bac46266a97a6096d6d772e023a3362fd48baac0.tar.gz
spark-bac46266a97a6096d6d772e023a3362fd48baac0.tar.bz2
spark-bac46266a97a6096d6d772e023a3362fd48baac0.zip
Link the Spark UI to the Yarn UI
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala63
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala10
-rw-r--r--yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala2
3 files changed, 57 insertions, 18 deletions
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 0f3b6bc1a6..d6acb080cc 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,6 +45,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
+ private var uiAddress: String = ""
+
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -53,21 +55,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
// Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
- if (numCore > 0) {
+ // if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
- }
- }
+ // }
+ //}
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406
@@ -83,6 +85,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
+
+ waitForSparkContextInitialized()
+
+ // do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
allocateWorkers()
@@ -134,8 +141,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
}
@@ -143,7 +149,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- while(!driverUp && tries < 10) {
+ val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ while(!driverUp && tries < numTries) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
@@ -189,24 +196,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
return t
}
- private def allocateWorkers() {
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
-
try {
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+ val waitTime = 10000L
+ val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
- ApplicationMaster.sparkContextRef.wait(10000L)
+ ApplicationMaster.sparkContextRef.wait(waitTime)
}
sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
+ sparkContext.preferredNodeLocationData)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ }
}
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +325,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ // set tracking url to empty since we don't have a history server
+ finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
diff --git a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
index b0af8baf08..1f235cef88 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -479,6 +479,15 @@ object YarnAllocationHandler {
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+ def newAllocator(conf: Configuration,
+ resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+ new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
+ args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+ }
+
def newAllocator(conf: Configuration,
resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
@@ -486,7 +495,6 @@ object YarnAllocationHandler {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
args.workerMemory, args.workerCores, hostToCount, rackToCount)
}
diff --git a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
index bb58353e0c..58a3f4043a 100644
--- a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+ logInfo("Created YarnClusterScheduler")
+
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate