diff options
Diffstat (limited to 'yarn/common/src')
4 files changed, 25 insertions, 22 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8c54840971..98039a20de 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} @@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private val sparkContextRef = new AtomicReference[SparkContext](null) final def run(): Int = { + val appAttemptId = client.getAttemptId() + if (isDriver) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box @@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, // Set the master property to match the requested mode. System.setProperty("spark.master", "yarn-cluster") + + // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) } - logInfo("ApplicationAttemptId: " + client.getAttemptId()) + logInfo("ApplicationAttemptId: " + appAttemptId) val cleanupHook = new Runnable { override def run() { @@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.compareAndSet(sc, null) } - private def registerAM(uiAddress: String, uiHistoryAddress: String) = { + private def registerAM(uiAddress: String) = { val sc = sparkContextRef.get() + + val appId = client.getAttemptId().getApplicationId().toString() + val historyAddress = + sparkConf.getOption("spark.yarn.historyServer.address") + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" } + .getOrElse("") + allocator = client.register(yarnConf, if (sc != null) sc.getConf else sparkConf, if (sc != null) sc.preferredNodeLocationData else Map(), uiAddress, - uiHistoryAddress) + historyAddress) allocator.allocateResources() reporterThread = launchReporterThread() @@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc == null) { finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") } else { - registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf)) + registerAM(sc.ui.appUIHostPort) try { userThread.join() } finally { @@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, conf = sparkConf, securityManager = securityMgr)._1 actor = waitForSparkDriver() addAmIpFilter() - registerAM(sparkConf.get("spark.driver.appUIAddress", ""), - sparkConf.get("spark.driver.appUIHistoryAddress", "")) + registerAM(sparkConf.get("spark.driver.appUIAddress", "")) // In client mode the actor will stop the reporter thread. reporterThread.join() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ffe2731ca1..dc77f12364 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -156,19 +155,6 @@ object YarnSparkHadoopUtil { } } - def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = { - val eventLogDir = sc.eventLogger match { - case Some(logger) => logger.getApplicationLogDir() - case None => "" - } - val historyServerAddress = conf.get("spark.yarn.historyServer.address", "") - if (historyServerAddress != "" && eventLogDir != "") { - historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir" - } else { - "" - } - } - /** * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index a5f537dd9d..41c662cd7a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend( val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort) - conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf)) val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( @@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend( override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } + + override def applicationId(): Option[String] = Option(appId).map(_.toString()) + } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 55665220a6..39436d0999 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { var totalExpectedExecutors = 0 - + if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { minRegisteredRatio = 0.8 } @@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend( override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } + + override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id") + } |