aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-08-30 15:55:32 -0500
committerY.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com>2013-08-30 15:55:32 -0500
commitbac46266a97a6096d6d772e023a3362fd48baac0 (patch)
tree26ec4fc4098b3c34a9277731c6cd7e3769b9b1e2
parent94bb7fd46e5586e1d08a99d21eecef93eeb4b97c (diff)
downloadspark-bac46266a97a6096d6d772e023a3362fd48baac0.tar.gz
spark-bac46266a97a6096d6d772e023a3362fd48baac0.tar.bz2
spark-bac46266a97a6096d6d772e023a3362fd48baac0.zip
Link the Spark UI to the Yarn UI
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala7
-rw-r--r--core/src/main/scala/spark/ui/UIUtils.scala58
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolTable.scala3
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala5
-rw-r--r--core/src/main/scala/spark/ui/storage/IndexPage.scala2
-rw-r--r--docs/running-on-yarn.md4
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala63
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala10
-rw-r--r--yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala2
10 files changed, 111 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 42c3b4a6cf..8bdc1469e6 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(
throw new IllegalArgumentException("must supply spark home for spark standalone"))
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
- sc.ui.appUIAddress)
+ sc.ui.appHttpUIAddress)
client = new Client(sc.env.actorSystem, master, appDesc, this)
client.start()
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 23ded44ba3..e078c4a6b2 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -1,5 +1,4 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
+/* * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -78,7 +77,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
server.foreach(_.stop())
}
- private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
+ private[spark] def appHttpUIAddress = "http://" + appUIAddress
+ private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+
}
private[spark] object SparkUI {
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index fe2afc1129..6b45679f9d 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -25,33 +25,53 @@ import spark.SparkContext
private[spark] object UIUtils {
import Page._
+ // Yarn has to go through a proxy so the base uri is provided and has to be on all links
+ private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
+ getOrElse("")
+
+ def addBaseUri(resource: String = ""): String = {
+ return uiRoot + resource
+ }
+
+ private[spark] val storageStr = addBaseUri("/storage")
+ private[spark] val stagesStr = addBaseUri("/stages")
+ private[spark] val envStr = addBaseUri("/environment")
+ private[spark] val executorsStr = addBaseUri("/executors")
+ private[spark] val bootstrapMinCssStr = addBaseUri("/static/bootstrap.min.css")
+ private[spark] val webuiCssStr = addBaseUri("/static/webui.css")
+ private[spark] val bootstrapResponsiveCssStr = addBaseUri("/static/bootstrap-responsive.min.css")
+ private[spark] val sortTableStr = addBaseUri("/static/sorttable.js")
+ private[spark] val sparkLogoHdStr = addBaseUri("/static/spark-logo-77x50px-hd.png")
+ private[spark] val sparkLogoStr = addBaseUri("/static/spark_logo.png")
+
+
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
: Seq[Node] = {
- val jobs = page match {
- case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
- case _ => <li><a href="/stages">Jobs</a></li>
- }
val storage = page match {
- case Storage => <li class="active"><a href="/storage">Storage</a></li>
- case _ => <li><a href="/storage">Storage</a></li>
+ case Storage => <li class="active"><a href={storageStr}>Storage</a></li>
+ case _ => <li><a href={storageStr}>Storage</a></li>
+ }
+ val jobs = page match {
+ case Jobs => <li class="active"><a href={stagesStr}>Jobs</a></li>
+ case _ => <li><a href={stagesStr}>Jobs</a></li>
}
val environment = page match {
- case Environment => <li class="active"><a href="/environment">Environment</a></li>
- case _ => <li><a href="/environment">Environment</a></li>
+ case Environment => <li class="active"><a href={envStr}>Environment</a></li>
+ case _ => <li><a href={envStr}>Environment</a></li>
}
val executors = page match {
- case Executors => <li class="active"><a href="/executors">Executors</a></li>
- case _ => <li><a href="/executors">Executors</a></li>
+ case Executors => <li class="active"><a href={executorsStr}>Executors</a></li>
+ case _ => <li><a href={executorsStr}>Executors</a></li>
}
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
- <link rel="stylesheet" href="/static/webui.css" type="text/css" />
- <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
- <script src="/static/sorttable.js"></script>
+ <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" />
+ <link rel="stylesheet" href={webuiCssStr} type="text/css" />
+ <link rel="stylesheet" href={bootstrapResponsiveCssStr} type="text/css" />
+ <script src={sortTableStr}></script>
<title>{sc.appName} - {title}</title>
<style type="text/css">
table.sortable thead {{ cursor: pointer; }}
@@ -65,7 +85,7 @@ private[spark] object UIUtils {
<div class="navbar">
<div class="navbar-inner">
<div class="container">
- <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
+ <a href="/" class="brand"><img src={sparkLogoHdStr} /></a>
<ul class="nav nav-pills">
{jobs}
{storage}
@@ -98,9 +118,9 @@ private[spark] object UIUtils {
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
- <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
- <script src="/static/sorttable.js"></script>
+ <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" />
+ <link rel="stylesheet" href={bootstrapResponsiveCssStr} type="text/css" />
+ <script src={sortTableStr}></script>
<title>{title}</title>
<style type="text/css">
table.sortable thead {{ cursor: pointer; }}
@@ -110,7 +130,7 @@ private[spark] object UIUtils {
<div class="container">
<div class="row">
<div class="span2">
- <img src="/static/spark_logo.png" />
+ <img src={sparkLogoStr} />
</div>
<div class="span10">
<h3 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index 621828f9c3..a6bd734e29 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -6,6 +6,7 @@ import scala.xml.Node
import spark.scheduler.Stage
import spark.scheduler.cluster.Schedulable
+import spark.ui.UIUtils
/** Table showing list of pools */
private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
@@ -43,7 +44,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
case None => 0
}
<tr>
- <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+ <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.addBaseUri(),p.name)}>{p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index b31f4abc26..f3d870c813 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -8,6 +8,7 @@ import scala.collection.mutable.HashSet
import spark.Utils
import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
+import spark.ui.UIUtils
/** Page showing list of all ongoing and recently finished stages */
@@ -81,7 +82,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val poolName = listener.stageToPool.get(s)
- val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+ val nameLink = <a href={"%s/stages/stage?id=%s".format(UIUtils.addBaseUri(),s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
@@ -90,7 +91,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
<tr>
<td>{s.id}</td>
{if (isFairScheduler) {
- <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+ <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.addBaseUri(),poolName.get)}>{poolName.get}</a></td>}
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index 0751f9e8f9..9675f80aff 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -50,7 +50,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
def rddRow(rdd: RDDInfo): Seq[Node] = {
<tr>
<td>
- <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+ <a href={"%s/storage/rdd?id=%s".format(addBaseUri(),rdd.id)}>
{rdd.name}
</a>
</td>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 678cd57aba..3f0d077f71 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -46,8 +46,12 @@ If you want to test out the YARN deployment mode, you can use the current Spark
Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those. These are configs that are specific to SPARK on YARN.
+Environment variables:
* `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables. ie SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"
+Properties:
+* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
+
# Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 0f3b6bc1a6..d6acb080cc 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,6 +45,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
+ private var uiAddress: String = ""
+
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -53,21 +55,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
// Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
- if (numCore > 0) {
+ // if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
- }
- }
+ // }
+ //}
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406
@@ -83,6 +85,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
+
+ waitForSparkContextInitialized()
+
+ // do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
allocateWorkers()
@@ -134,8 +141,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
}
@@ -143,7 +149,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- while(!driverUp && tries < 10) {
+ val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ while(!driverUp && tries < numTries) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
@@ -189,24 +196,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
return t
}
- private def allocateWorkers() {
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
-
try {
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+ val waitTime = 10000L
+ val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
- ApplicationMaster.sparkContextRef.wait(10000L)
+ ApplicationMaster.sparkContextRef.wait(waitTime)
}
sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
+ sparkContext.preferredNodeLocationData)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ }
}
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +325,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ // set tracking url to empty since we don't have a history server
+ finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
diff --git a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
index b0af8baf08..1f235cef88 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -479,6 +479,15 @@ object YarnAllocationHandler {
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+ def newAllocator(conf: Configuration,
+ resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+ new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
+ args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+ }
+
def newAllocator(conf: Configuration,
resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
@@ -486,7 +495,6 @@ object YarnAllocationHandler {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
args.workerMemory, args.workerCores, hostToCount, rackToCount)
}
diff --git a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
index bb58353e0c..58a3f4043a 100644
--- a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+ logInfo("Created YarnClusterScheduler")
+
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate