aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2015-05-21 20:24:28 -0500
committerImran Rashid <irashid@cloudera.com>2015-05-21 20:24:28 -0500
commit956c4c910cb536a02128349f2250d0a5f9924d0c (patch)
tree0b436f5f2c9ec88ad0fcfd53b575e3ef4c64ab56 /core
parent85b96372cf0fd055f89fc639f45c1f2cb02a378f (diff)
downloadspark-956c4c910cb536a02128349f2250d0a5f9924d0c.tar.gz
spark-956c4c910cb536a02128349f2250d0a5f9924d0c.tar.bz2
spark-956c4c910cb536a02128349f2250d0a5f9924d0c.zip
[SPARK-7657] [YARN] Add driver logs links in application UI, in cluster mode.
This PR adds the URLs to the driver logs to `SparkListenerApplicationStarted` event, which is later used by the `ExecutorsListener` to populate the URLs to the driver logs in its own state. This info is then used when the UI is rendered to display links to the logs. Author: Hari Shreedharan <hshreedharan@apache.org> Closes #6166 from harishreedharan/am-log-link and squashes the following commits: 943fc4f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link 9e5c04b [Hari Shreedharan] Merge remote-tracking branch 'asf/master' into am-log-link b3f9b9d [Hari Shreedharan] Updated comment based on feedback. 0840a95 [Hari Shreedharan] Move the result and sc.stop back to original location, minor import changes. 537a2f7 [Hari Shreedharan] Add test to ensure the log urls are populated and valid. 4033725 [Hari Shreedharan] Adding comments explaining how node reports are used to get the log urls. 6c5c285 [Hari Shreedharan] Import order. 346f4ea [Hari Shreedharan] Review feedback fixes. 629c1dc [Hari Shreedharan] Cleanup. 99fb1a3 [Hari Shreedharan] Send the log urls in App start event, to ensure that other listeners are not affected. c0de336 [Hari Shreedharan] Ensure new unit test cleans up after itself. 50cdae3 [Hari Shreedharan] Added unit test, made the approach generic. 402e8e4 [Hari Shreedharan] Use `NodeReport` to get the URL for the logs. Also, make the environment variables generic so other cluster managers can use them as well. 1cf338f [Hari Shreedharan] [SPARK-7657][YARN] Add driver link in application UI, in cluster mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
5 files changed, 30 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cf3820fcb6..ad78bdfde2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
- startTime, sparkUser, applicationAttemptId))
+ startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 646820520e..8801a761af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend {
*/
def applicationAttemptId(): Option[String] = None
+ /**
+ * Get the URLs for the driver logs. These URLs are used to display the links in the UI
+ * Executors tab for the driver.
+ * @return Map containing the log names and their respective URLs
+ */
+ def getDriverLogUrls: Option[Map[String, String]] = None
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 169d4fd3a9..863d0befbc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, appId: Option[String],
- time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
+case class SparkListenerApplicationStart(
+ appName: String,
+ appId: Option[String],
+ time: Long,
+ sparkUser: String,
+ appAttemptId: Option[String],
+ driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 0a08b000e2..39583af143 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui.exec
import scala.collection.mutable.HashMap
-import org.apache.spark.ExceptionFailure
+import org.apache.spark.{ExceptionFailure, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
@@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
uiData.finishReason = Some(executorRemoved.reason)
}
+ override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
+ applicationStart.driverLogs.foreach { logs =>
+ val storageStatus = storageStatusList.find { s =>
+ s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
+ s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
+ }
+ storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
+ }
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 3f162d1f6c..adf69a4e78 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -196,7 +196,8 @@ private[spark] object JsonProtocol {
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser) ~
- ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
+ ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
+ ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -570,7 +571,8 @@ private[spark] object JsonProtocol {
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
- SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
+ val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
+ SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {