aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala24
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala14
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala4
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala5
4 files changed, 25 insertions, 22 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8c54840971..98039a20de 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = {
+ val appAttemptId = client.getAttemptId()
+
if (isDriver) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
@@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Set the master property to match the requested mode.
System.setProperty("spark.master", "yarn-cluster")
+
+ // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
+ System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
}
- logInfo("ApplicationAttemptId: " + client.getAttemptId())
+ logInfo("ApplicationAttemptId: " + appAttemptId)
val cleanupHook = new Runnable {
override def run() {
@@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}
- private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+ private def registerAM(uiAddress: String) = {
val sc = sparkContextRef.get()
+
+ val appId = client.getAttemptId().getApplicationId().toString()
+ val historyAddress =
+ sparkConf.getOption("spark.yarn.historyServer.address")
+ .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+ .getOrElse("")
+
allocator = client.register(yarnConf,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
- uiHistoryAddress)
+ historyAddress)
allocator.allocateResources()
reporterThread = launchReporterThread()
@@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
- registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+ registerAM(sc.ui.appUIHostPort)
try {
userThread.join()
} finally {
@@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
- registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
- sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+ registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
// In client mode the actor will stop the reporter thread.
reporterThread.join()
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ffe2731ca1..dc77f12364 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -156,19 +155,6 @@ object YarnSparkHadoopUtil {
}
}
- def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
- val eventLogDir = sc.eventLogger match {
- case Some(logger) => logger.getApplicationLogDir()
- case None => ""
- }
- val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
- if (historyServerAddress != "" && eventLogDir != "") {
- historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
- } else {
- ""
- }
- }
-
/**
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
* using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index a5f537dd9d..41c662cd7a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend(
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
- conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
@@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = Option(appId).map(_.toString())
+
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 55665220a6..39436d0999 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
var totalExpectedExecutors = 0
-
+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
}
@@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
+
+ override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
+
}