aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-05-21 20:24:28 -0500
committerImran Rashid <irashid@cloudera.com>2015-05-21 20:24:28 -0500
commit956c4c910cb536a02128349f2250d0a5f9924d0c (patch)
tree0b436f5f2c9ec88ad0fcfd53b575e3ef4c64ab56 /yarn
parent85b96372cf0fd055f89fc639f45c1f2cb02a378f (diff)
downloadspark-956c4c910cb536a02128349f2250d0a5f9924d0c.tar.gz
spark-956c4c910cb536a02128349f2250d0a5f9924d0c.tar.bz2
spark-956c4c910cb536a02128349f2250d0a5f9924d0c.zip
[SPARK-7657] [YARN] Add driver logs links in application UI, in cluster mode.
This PR adds the URLs to the driver logs to `SparkListenerApplicationStarted` event, which is later used by the `ExecutorsListener` to populate the URLs to the driver logs in its own state. This info is then used when the UI is rendered to display links to the logs. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #6166 from harishreedharan/am-log-link and squashes the following commits: 943fc4f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link 9e5c04b [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link b3f9b9d [Hari Shreedharan] Updated comment based on feedback. 0840a95 [Hari Shreedharan] Move the result and sc.stop back to original location, minor import changes. 537a2f7 [Hari Shreedharan] Add test to ensure the log urls are populated and valid. 4033725 [Hari Shreedharan] Adding comments explaining how node reports are used to get the log urls. 6c5c285 [Hari Shreedharan] Import order. 346f4ea [Hari Shreedharan] Review feedback fixes. 629c1dc [Hari Shreedharan] Cleanup. 99fb1a3 [Hari Shreedharan] Send the log urls in App start event, to ensure that other listeners are not affected. c0de336 [Hari Shreedharan] Ensure new unit test cleans up after itself. 50cdae3 [Hari Shreedharan] Added unit test, made the approach generic. 402e8e4 [Hari Shreedharan] Use `NodeReport` to get the URL for the logs. Also, make the environment variables generic so other cluster managers can use them as well. 1cf338f [Hari Shreedharan] [SPARK-7657][YARN] Add driver link in application UI, in cluster mode.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala77
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala24
4 files changed, 106 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index b134751366..ffe71dfd7d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -89,9 +89,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId = {
- val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- containerId.getApplicationAttemptId()
+ YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
}
/** Returns the configuration for the AmIpFilter to add to the Spark UI. */
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ba91872107..5e6531895c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
+import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
+import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -136,6 +137,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
tokenRenewer.foreach(_.stop())
}
+ private[spark] def getContainerId: ContainerId = {
+ val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+ ConverterUtils.toContainerId(containerIdString)
+ }
}
object YarnSparkHadoopUtil {
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index aeb218a575..1ace1a97d5 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -17,10 +17,19 @@
package org.apache.spark.scheduler.cluster
+import java.net.NetworkInterface
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.yarn.api.records.NodeState
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.IntParam
+import org.apache.spark.util.{IntParam, Utils}
private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
@@ -53,4 +62,70 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application attempt ID is not set.")
super.applicationAttemptId
}
+
+ override def getDriverLogUrls: Option[Map[String, String]] = {
+ var yarnClientOpt: Option[YarnClient] = None
+ var driverLogs: Option[Map[String, String]] = None
+ try {
+ val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
+ val containerId = YarnSparkHadoopUtil.get.getContainerId
+ yarnClientOpt = Some(YarnClient.createYarnClient())
+ yarnClientOpt.foreach { yarnClient =>
+ yarnClient.init(yarnConf)
+ yarnClient.start()
+
+ // For newer versions of YARN, we can find the HTTP address for a given node by getting a
+ // container report for a given container. But container reports came only in Hadoop 2.4,
+ // so we basically have to get the node reports for all nodes and find the one which runs
+ // this container. For that we have to compare the node's host against the current host.
+ // Since the host can have multiple addresses, we need to compare against all of them to
+ // find out if one matches.
+
+ // Get all the addresses of this node.
+ val addresses =
+ NetworkInterface.getNetworkInterfaces.asScala
+ .flatMap(_.getInetAddresses.asScala)
+ .toSeq
+
+ // Find a node report that matches one of the addresses
+ val nodeReport =
+ yarnClient.getNodeReports(NodeState.RUNNING).asScala.find { x =>
+ val host = x.getNodeId.getHost
+ addresses.exists { address =>
+ address.getHostAddress == host ||
+ address.getHostName == host ||
+ address.getCanonicalHostName == host
+ }
+ }
+
+ // Now that we have found the report for the Node Manager that the AM is running on, we
+ // can get the base HTTP address for the Node manager from the report.
+ // The format used for the logs for each container is well-known and can be constructed
+ // using the NM's HTTP address and the container ID.
+ // The NM may be running several containers, but we can build the URL for the AM using
+ // the AM's container ID, which we already know.
+ nodeReport.foreach { report =>
+ val httpAddress = report.getHttpAddress
+ // lookup appropriate http scheme for container log urls
+ val yarnHttpPolicy = yarnConf.get(
+ YarnConfiguration.YARN_HTTP_POLICY_KEY,
+ YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
+ )
+ val user = Utils.getCurrentUserName()
+ val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
+ val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
+ logDebug(s"Base URL for logs: $baseUrl")
+ driverLogs = Some(
+ Map("stderr" -> s"$baseUrl/stderr?start=0", "stdout" -> s"$baseUrl/stdout?start=0"))
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logInfo("Node Report API is not available in the version of YARN being used, so AM" +
+ " logs link will not appear in application UI", e)
+ } finally {
+ yarnClientOpt.foreach(_.close())
+ }
+ driverLogs
+ }
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d3c606e0ed..dcaeb2e43f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable
+import scala.io.Source
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.ByteStreams
@@ -33,7 +34,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
+ SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
/**
@@ -290,10 +292,15 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+ var driverLogs: Option[collection.Map[String, String]] = None
override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
+
+ override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = {
+ driverLogs = appStart.driverLogs
+ }
}
private object YarnClusterDriver extends Logging with Matchers {
@@ -314,6 +321,7 @@ private object YarnClusterDriver extends Logging with Matchers {
val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+ val conf = sc.getConf
val status = new File(args(0))
var result = "failure"
try {
@@ -335,6 +343,20 @@ private object YarnClusterDriver extends Logging with Matchers {
executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
+
+ // If we are running in yarn-cluster mode, verify that driver logs are downloadable.
+ if (conf.get("spark.master") == "yarn-cluster") {
+ assert(listener.driverLogs.nonEmpty)
+ val driverLogs = listener.driverLogs.get
+ assert(driverLogs.size === 2)
+ assert(driverLogs.containsKey("stderr"))
+ assert(driverLogs.containsKey("stdout"))
+ val stderr = driverLogs("stderr") // YARN puts everything in stderr.
+ val lines = Source.fromURL(stderr).getLines()
+ // Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
+ // cluster mode.
+ assert(lines.exists(_.contains("YarnClusterSchedulerBackend")))
+ }
}
}