aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-09-03 14:29:10 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-09-03 14:29:10 -0700
commit19f70273d2518456ea217329744a7d23c86d5a13 (patch)
treee660a76ed5ff851483f8e2495669a84042219b9d
parent68df2464d1146f0eb3115859b44b7f3c5112505e (diff)
parent41c1b5b9a0d8155c94cf1eb4c34d2b2db41b0d8f (diff)
downloadspark-19f70273d2518456ea217329744a7d23c86d5a13.tar.gz
spark-19f70273d2518456ea217329744a7d23c86d5a13.tar.bz2
spark-19f70273d2518456ea217329744a7d23c86d5a13.zip
Merge pull request #878 from tgravescs/yarnUILink
Link the Spark UI up to the Yarn UI
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--docs/running-on-yarn.md10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala78
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala3
17 files changed, 117 insertions, 61 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f2641851cb..89318712a5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -196,7 +196,7 @@ class SparkContext(
case "yarn-standalone" =>
val scheduler = try {
- val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler")
+ val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(this).asInstanceOf[ClusterScheduler]
} catch {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 478e5a0aaf..29968c273c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -62,7 +62,7 @@ class SparkEnv (
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if(yarnMode) {
try {
- Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index d003bf1bba..9a2cf20de7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
"org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
- sc.ui.appUIAddress)
+ "http://" + sc.ui.appUIAddress)
client = new Client(sc.env.actorSystem, master, appDesc, this)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index ad456ea565..48eb096063 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -79,7 +79,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
server.foreach(_.stop())
}
- private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
+ private[spark] def appUIAddress = host + ":" + boundPort.getOrElse("-1")
+
}
private[spark] object SparkUI {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index ce1acf564c..5573b3847b 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -25,38 +25,45 @@ import org.apache.spark.SparkContext
private[spark] object UIUtils {
import Page._
+ // Yarn has to go through a proxy so the base uri is provided and has to be on all links
+ private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).
+ getOrElse("")
+
+ def prependBaseUri(resource: String = "") = uiRoot + resource
+
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
: Seq[Node] = {
val jobs = page match {
- case Stages => <li class="active"><a href="/stages">Stages</a></li>
- case _ => <li><a href="/stages">Stages</a></li>
+ case Stages => <li class="active"><a href={prependBaseUri("/stages")}>Stages</a></li>
+ case _ => <li><a href={prependBaseUri("/stages")}>Stages</a></li>
}
val storage = page match {
- case Storage => <li class="active"><a href="/storage">Storage</a></li>
- case _ => <li><a href="/storage">Storage</a></li>
+ case Storage => <li class="active"><a href={prependBaseUri("/storage")}>Storage</a></li>
+ case _ => <li><a href={prependBaseUri("/storage")}>Storage</a></li>
}
val environment = page match {
- case Environment => <li class="active"><a href="/environment">Environment</a></li>
- case _ => <li><a href="/environment">Environment</a></li>
+ case Environment =>
+ <li class="active"><a href={prependBaseUri("/environment")}>Environment</a></li>
+ case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
}
val executors = page match {
- case Executors => <li class="active"><a href="/executors">Executors</a></li>
- case _ => <li><a href="/executors">Executors</a></li>
+ case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
+ case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
}
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
- <link rel="stylesheet" href="/static/webui.css" type="text/css" />
- <script src="/static/sorttable.js"></script>
+ <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
+ <script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{sc.appName} - {title}</title>
</head>
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
- <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
+ <a href={prependBaseUri("/")} class="brand"><img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
<ul class="nav">
{jobs}
{storage}
@@ -86,9 +93,9 @@ private[spark] object UIUtils {
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
- <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" />
- <link rel="stylesheet" href="/static/webui.css" type="text/css" />
- <script src="/static/sorttable.js"></script>
+ <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} type="text/css" />
+ <link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" />
+ <script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{title}</title>
</head>
<body>
@@ -96,7 +103,7 @@ private[spark] object UIUtils {
<div class="row-fluid">
<div class="span12">
<h3 style="vertical-align: middle; display: inline-block;">
- <img src="/static/spark-logo-77x50px-hd.png" style="margin-right: 15px;" />
+ <img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} style="margin-right: 15px;" />
{title}
</h3>
</div>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index 5670c933bd..b3d3666944 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -23,6 +23,7 @@ import scala.xml.Node
import org.apache.spark.scheduler.Stage
import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
@@ -60,7 +61,7 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
case None => 0
}
<tr>
- <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+ <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a></td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a8911e46ae..399a89459b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import org.apache.spark.scheduler.Stage
+import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
@@ -98,7 +99,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val poolName = listener.stageToPool.get(s)
- val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+ val nameLink = <a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(),s.id)}>{s.name}</a>
val description = listener.stageToDescription.get(s)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis())
@@ -107,7 +108,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
<tr>
<td>{s.id}</td>
{if (isFairScheduler) {
- <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+ <td><a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),poolName.get)}>{poolName.get}</a></td>}
}
<td>{description}</td>
<td valign="middle">{submissionTime}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index c3ec907370..109a7d4094 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -50,7 +50,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
def rddRow(rdd: RDDInfo): Seq[Node] = {
<tr>
<td>
- <a href={"/storage/rdd?id=%s".format(rdd.id)}>
+ <a href={"%s/storage/rdd?id=%s".format(prependBaseUri(),rdd.id)}>
{rdd.name}
</a>
</td>
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bb47fc0a2c..468800b2bd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -613,7 +613,7 @@ private[spark] object Utils extends Logging {
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
- private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
+ private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
val firstUserLine: Int, val firstUserClass: String)
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index fe5334ffdc..93421efcbc 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -29,8 +29,12 @@ If you want to test out the YARN deployment mode, you can use the current Spark
Most of the configs are the same for Spark on YARN as other deploys. See the Configuration page for more information on those. These are configs that are specific to SPARK on YARN.
+Environment variables:
* `SPARK_YARN_USER_ENV`, to add environment variables to the Spark processes launched on YARN. This can be a comma separated list of environment variables, e.g. `SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"`.
+System Properties:
+* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
+
# Launching Spark on YARN
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the hadoop cluster.
@@ -38,7 +42,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
The command to launch the YARN Client is as follows:
- SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class spark.deploy.yarn.Client \
+ SPARK_JAR=<SPARK_YARN_JAR_FILE> ./spark-class org.apache.spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
@@ -50,9 +54,9 @@ The command to launch the YARN Client is as follows:
For example:
- SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class spark.deploy.yarn.Client \
+ SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
- --class spark.examples.SparkPi \
+ --class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 4g \
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 139a977a03..858b58d338 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import scala.collection.JavaConversions._
-import org.apache.spark.{SparkContext, Logging, Utils}
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.util.Utils
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
@@ -45,6 +46,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
+ private var uiAddress: String = ""
+
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -53,27 +56,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ // Workaround until hadoop moves to something which has
+ // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
+ // ignore result
+ // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
+ // Hence args.workerCores = numCore disabled above. Any better option ?
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ // Compute number of threads for akka
+ //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
+ //if (minimumMemory > 0) {
+ // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
- if (numCore > 0) {
+ // if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
// TODO: Uncomment when hadoop is on a version which has this fixed.
// args.workerCores = numCore
- }
- }
-
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406
- // ignore result
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option ?
+ // }
+ //}
// org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
ApplicationMaster.register(this)
@@ -83,6 +84,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
waitForSparkMaster()
+
+ waitForSparkContextInitialized()
+
+ // do this after spark master is up and SparkContext is created so that we can register UI Url
+ val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
// Allocate all containers
allocateWorkers()
@@ -134,8 +140,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
+ appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
}
@@ -143,7 +148,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
var tries = 0
- while(!driverUp && tries < 10) {
+ val numTries = System.getProperty("spark.yarn.applicationMaster.waitTries", "10").toInt
+ while(!driverUp && tries < numTries) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
@@ -189,24 +195,44 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
return t
}
- private def allocateWorkers() {
+ // this need to happen before allocateWorkers
+ private def waitForSparkContextInitialized() {
logInfo("Waiting for spark context initialization")
-
try {
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
+ val waitTime = 10000L
+ val numTries = System.getProperty("spark.yarn.ApplicationMaster.waitTries", "10").toInt
+ while (ApplicationMaster.sparkContextRef.get() == null && count < numTries) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
- ApplicationMaster.sparkContextRef.wait(10000L)
+ ApplicationMaster.sparkContextRef.wait(waitTime)
}
sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
+ assert(sparkContext != null || count >= numTries)
+
+ if (null != sparkContext) {
+ uiAddress = sparkContext.ui.appUIAddress
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
+ sparkContext.preferredNodeLocationData)
+ } else {
+ logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ }
}
+ } finally {
+ // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
+ // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
+ ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
+ }
+ }
+
+ private def allocateWorkers() {
+ try {
logInfo("Allocating " + args.numWorkers + " workers.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
@@ -298,6 +324,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ // set tracking url to empty since we don't have a history server
+ finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index f47e23b63f..f76a5ddd39 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -80,7 +80,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
- "Usage: spark.deploy.yarn.ApplicationMaster [options] \n" +
+ "Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 48e737ed79..844c707834 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.spark.deploy.SparkHadoopUtil
@@ -254,7 +255,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
- " spark.deploy.yarn.ApplicationMaster" +
+ " org.apache.spark.deploy.yarn.ApplicationMaster" +
" --class " + args.userClass +
" --jar " + args.userJar +
userArgsToString(args) +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 6cbfadc23b..cd651904d2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -98,7 +98,7 @@ class ClientArguments(val args: Array[String]) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
- "Usage: spark.deploy.yarn.Client [options] \n" +
+ "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
" --class CLASS_NAME Name of your application's main class (required)\n" +
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 72dcf7178e..6229167cb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -37,7 +37,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 26ff214e12..6d6ef149cc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,7 +17,8 @@
package org.apache.spark.deploy.yarn
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SplitInfo
import scala.collection
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
@@ -479,6 +480,15 @@ object YarnAllocationHandler {
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+
+ def newAllocator(conf: Configuration,
+ resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
+ args: ApplicationMasterArguments): YarnAllocationHandler = {
+
+ new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
+ args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]())
+ }
+
def newAllocator(conf: Configuration,
resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
@@ -486,7 +496,6 @@ object YarnAllocationHandler {
val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
args.workerMemory, args.workerCores, hostToCount, rackToCount)
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 3828ddfc4f..29b3f22e13 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
/**
@@ -27,6 +28,8 @@ import org.apache.hadoop.conf.Configuration
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+ logInfo("Created YarnClusterScheduler")
+
def this(sc: SparkContext) = this(sc, new Configuration())
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate