diff options
author | Hari Shreedharan <hshreedharan@apache.org> | 2015-05-21 20:24:28 -0500 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2015-05-21 20:24:28 -0500 |
commit | 956c4c910cb536a02128349f2250d0a5f9924d0c (patch) | |
tree | 0b436f5f2c9ec88ad0fcfd53b575e3ef4c64ab56 /core/src/main | |
parent | 85b96372cf0fd055f89fc639f45c1f2cb02a378f (diff) | |
download | spark-956c4c910cb536a02128349f2250d0a5f9924d0c.tar.gz spark-956c4c910cb536a02128349f2250d0a5f9924d0c.tar.bz2 spark-956c4c910cb536a02128349f2250d0a5f9924d0c.zip |
[SPARK-7657] [YARN] Add driver logs links in application UI, in cluster mode.
This PR adds the URLs to the driver logs to `SparkListenerApplicationStarted` event, which is later used by the `ExecutorsListener` to populate the URLs to the driver logs in its own state. This info is then used when the UI is rendered to display links to the logs.
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes #6166 from harishreedharan/am-log-link and squashes the following commits:
943fc4f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link
9e5c04b [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link
b3f9b9d [Hari Shreedharan] Updated comment based on feedback.
0840a95 [Hari Shreedharan] Move the result and sc.stop back to original location, minor import changes.
537a2f7 [Hari Shreedharan] Add test to ensure the log urls are populated and valid.
4033725 [Hari Shreedharan] Adding comments explaining how node reports are used to get the log urls.
6c5c285 [Hari Shreedharan] Import order.
346f4ea [Hari Shreedharan] Review feedback fixes.
629c1dc [Hari Shreedharan] Cleanup.
99fb1a3 [Hari Shreedharan] Send the log urls in App start event, to ensure that other listeners are not affected.
c0de336 [Hari Shreedharan] Ensure new unit test cleans up after itself.
50cdae3 [Hari Shreedharan] Added unit test, made the approach generic.
402e8e4 [Hari Shreedharan] Use `NodeReport` to get the URL for the logs. Also, make the environment variables generic so other cluster managers can use them as well.
1cf338f [Hari Shreedharan] [SPARK-7657][YARN] Add driver link in application UI, in cluster mode.
Diffstat (limited to 'core/src/main')
5 files changed, 30 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cf3820fcb6..ad78bdfde2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), - startTime, sparkUser, applicationAttemptId)) + startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) } /** Post the application end event */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 646820520e..8801a761af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend { */ def applicationAttemptId(): Option[String] = None + /** + * Get the URLs for the driver logs. These URLs are used to display the links in the UI + * Executors tab for the driver. + * @return Map containing the log names and their respective URLs + */ + def getDriverLogUrls: Option[Map[String, String]] = None + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 169d4fd3a9..863d0befbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate( extends SparkListenerEvent @DeveloperApi -case class SparkListenerApplicationStart(appName: String, appId: Option[String], - time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent +case class SparkListenerApplicationStart( + appName: String, + appId: Option[String], + time: Long, + sparkUser: String, + appAttemptId: Option[String], + driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 0a08b000e2..39583af143 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.HashMap -import org.apache.spark.ExceptionFailure +import org.apache.spark.{ExceptionFailure, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp uiData.finishReason = Some(executorRemoved.reason) } + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + applicationStart.driverLogs.foreach { logs => + val storageStatus = storageStatusList.find { s => + s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || + s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER + } + storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap } + } + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val eid = taskStart.taskInfo.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 3f162d1f6c..adf69a4e78 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -196,7 +196,8 @@ private[spark] object JsonProtocol { ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) ~ - ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) + ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~ + ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { @@ -570,7 +571,8 @@ private[spark] object JsonProtocol { val time = (json \ "Timestamp").extract[Long] val sparkUser = (json \ "User").extract[String] val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String]) - SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId) + val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson) + SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs) } def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { |