aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala2
-rw-r--r--docs/running-on-yarn.md4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala63
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala2
10 files changed, 104 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d003bf1bba..3f8a96fc47 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
"org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
- sc.ui.appUIAddress)
+ sc.ui.appHttpUIAddress)
client = new Client(sc.env.actorSystem, master, appDesc, this)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ad456ea565..5d54420e72 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -79,7 +79,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
server.foreach(_.stop())
}
- private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
+ private[spark] def appHttpUIAddress = "http://" + appUIAddress
+ private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+
}
private[spark] object SparkUI {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index ce1acf564c..9190b8ac2c 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -25,38 +25,57 @@ import org.apache.spark.SparkContext
private[spark] object UIUtils {
import Page._
+ // Yarn has to go through a proxy so the base uri is provided and has to be on all links
+ private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
+ getOrElse("")
+
+ def addBaseUri(resource: String = ""): String = {
+ return uiRoot + resource
+ }
+
+ private[spark] val storageStr = addBaseUri("/storage")
+ private[spark] val stagesStr = addBaseUri("/stages")
+ private[spark] val envStr = addBaseUri("/environment")
+ private[spark] val executorsStr = addBaseUri("/executors")
+ private[spark] val bootstrapMinCssStr = addBaseUri("/static/bootstrap.min.css")
+ private[spark] val webuiCssStr = addBaseUri("/static/webui.css")
+ private[spark] val sortTableStr = addBaseUri("/static/sorttable.js")
+ private[spark] val sparkLogoHdStr = addBaseUri("/static/spark-logo-77x50px-hd.png")
+ private[spark] val sparkLogoStr = addBaseUri("/static/spark_logo.png")
+
+
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
: Seq[Node] = {
val jobs = page match {
- case Stages => <li class="active"><a href="/stages">Stages</a></li>
+ case Stages => <li class="active"><a href={stagesStr}>Stages</a></li>
case _ => <li><a href="/stages">Stages</a></li>
}
val storage = page match {
- case Storage => <li class="active"><a href="/storage">Storage</a></li>
- case _ => <li><a href="/storage">Storage</a></li>
+ case Storage => <li class="active"><a href={storageStr}>Storage</a></li>
+ case _ => <li><a href={storageStr}>Storage</a></li>
}
val environment = page match {
- case Environment => <li class="active"><a href="/environment">Environment</a></li>
- case _ => <li><a href="/environment">Environment</a></li>
+ case Environment => <li class="active"><a href={envStr}>Environment</a></li>
+ case _ => <li><a href={envStr}>Environment</a></li>
}
val executors = page match {
- case Executors => <li class="active"><a href="/executors">Executors</a></li>
- case _ => <li><a href="/executors">Executors</a></li>
+ case Executors => <li class="active"><a href={executorsStr}>Executors</a></li>
+ case _ => <li><a href={executorsStr}>Executors</a></li>
}
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
- <link rel="stylesheet" href="/static/webui.css" type="text/css" />
- <script src="/static/sorttable.js"></script>
+ <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" />
+ <link rel="stylesheet" href={webuiCssStr} type="text/css" />
+ <script src={sortTableStr}></script>
<title>{sc.appName} - {title}</title>
</head>
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
- <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
+ <a href="/" class="brand"><img src={sparkLogoHdStr} /></a>
<ul class="nav">
{jobs}
{storage}
@@ -86,9 +105,9 @@ private[spark] object UIUtils {
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
+ <link rel="stylesheet" href={bootstrapMinCssStr} type="text/css" />
<link rel="stylesheet" href="/static/webui.css" type="text/css" />
- <script src="/static/sorttable.js"></script>
+ <script src={sortTableStr}></script>
<title>{title}</title>
</head>
<body>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index 5670c933bd..c10ae4a372 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -23,6 +23,7 @@ import scala.xml.Node
import org.apache.spark.scheduler.Stage
import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
@@ -60,7 +61,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
case None => 0
}
<tr>
- <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+ <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.addBaseUri(),p.name)}>{p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a8911e46ae..b53c5c056a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import org.apache.spark.scheduler.Stage
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@@ -98,7 +99,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val poolName = listener.stageToPool.get(s)
- val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+ val nameLink = <a href={"%s/stages/stage?id=%s".format(UIUtils.addBaseUri(),s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
@@ -107,7 +108,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
<tr>
<td>{s.id}</td>
{if (isFairScheduler) {
- <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+ <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.addBaseUri(),poolName.get)}>{poolName.get}</a></td>}
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index c3ec907370..0af4b64443 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -50,7 +50,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
def rddRow(rdd: RDDInfo): Seq[Node] = {
<tr>
<td>
- <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+ <a href={"%s/storage/rdd?id=%s".format(addBaseUri(),rdd.id)}>
{rdd.name}
</a>
</td>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index fe5334ffdc..fded896185 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -29,8 +29,12 @@ If you want to test out the YARN deployment mode, you can use the current Spark
Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those. These are configs that are specific to SPARK on YARN.
+Environment variables:
* `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables, e.g. `SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"`.
+System Properties:
+* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
+
# Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 139a977a03..bbeca245a8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -45,6 +45,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
+ private var uiAddress: String = ""
+
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -53,21 +55,21 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
+ // TODO: Uncomment when hadoop is on a version which has this fixed.
// Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
- if (numCore > 0) {
+ // if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
- }
- }
+ // }
+ //}
// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406
@@ -83,6 +85,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
+
+ waitForSparkContextInitialized()
+
+ // do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
allocateWorkers()
@@ -134,8 +141,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
}
@@ -143,7 +149,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- while(!driverUp && tries < 10) {
+ val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ while(!driverUp && tries < numTries) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
@@ -189,24 +196,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
return t
}
- private def allocateWorkers() {
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
-
try {
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+ val waitTime = 10000L
+ val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
- ApplicationMaster.sparkContextRef.wait(10000L)
+ ApplicationMaster.sparkContextRef.wait(waitTime)
}
sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
+ sparkContext.preferredNodeLocationData)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ }
}
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +325,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ // set tracking url to empty since we don't have a history server
+ finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 26ff214e12..0a3b3abc74 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -479,6 +479,15 @@ object YarnAllocationHandler {
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+ def newAllocator(conf: Configuration,
+ resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+ new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
+ args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+ }
+
def newAllocator(conf: Configuration,
resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
@@ -486,7 +495,6 @@ object YarnAllocationHandler {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
args.workerMemory, args.workerCores, hostToCount, rackToCount)
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 3828ddfc4f..bbc96cfef7 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+ logInfo("Created YarnClusterScheduler")
+
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate