diff options
Diffstat (limited to 'core/src/main/scala/org')
11 files changed, 68 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index bc9f78b9e5..0add3064da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -43,6 +43,7 @@ private[spark] class ExecutorRunner( val worker: ActorRef, val workerId: String, val host: String, + val webUiPort: Int, val sparkHome: File, val executorDir: File, val workerUrl: String, @@ -134,6 +135,12 @@ private[spark] class ExecutorRunner( // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") + + // Add webUI log urls + val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") + builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") + process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b20f5c0c82..10929eb516 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -362,6 +362,7 @@ private[spark] class Worker( self, workerId, host, + webUiPort, sparkHome, executorDir, akkaUrl, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index bc72c89703..3a42f8b157 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend( override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - driver ! RegisterExecutor(executorId, hostPort, cores) + driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } + def extractLogUrls: Map[String, String] = { + val prefix = "SPARK_LOG_URL_" + sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) + } + override def receiveWithLogging = { case RegisteredExecutor => logInfo("Successfully registered with driver") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 1da6fe976d..9bf74f4be1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage // Executors to driver - case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) + case class RegisterExecutor( + executorId: String, + hostPort: String, + cores: Int, + logUrls: Map[String, String]) extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 103a5c053c..9d2fb4f3b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste } def receiveWithLogging = { - case RegisterExecutor(executorId, hostPort, cores) => + case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) - val data = new ExecutorData(sender, sender.path.address, host, cores, cores) + val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index eb52ddfb1e..5e571efe76 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -33,5 +33,6 @@ private[cluster] class ExecutorData( val executorAddress: Address, override val executorHost: String, var freeCores: Int, - override val totalCores: Int -) extends ExecutorInfo(executorHost, totalCores) + override val totalCores: Int, + override val logUrlMap: Map[String, String] +) extends ExecutorInfo(executorHost, totalCores, logUrlMap) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index b4738e64c9..7f21856614 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi class ExecutorInfo( val executorHost: String, - val totalCores: Int -) { + val totalCores: Int, + val logUrlMap: Map[String, String]) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -34,12 +34,13 @@ class ExecutorInfo( case that: ExecutorInfo => (that canEqual this) && executorHost == that.executorHost && - totalCores == that.totalCores + totalCores == that.totalCores && + logUrlMap == that.logUrlMap case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores) + val state = Seq(executorHost, totalCores, logUrlMap) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c3c546be6d..cfb6592e14 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend( mesosTasks.foreach { case (slaveId, tasks) => slaveIdToWorkerOffer.get(slaveId).foreach(o => listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, - new ExecutorInfo(o.host, o.cores))) + // TODO: Add support for log urls for Mesos + new ExecutorInfo(o.host, o.cores, Map.empty))) ) d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 363cb96de7..956608d7c0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Summary information about an executor to display in the UI. */ -private case class ExecutorSummaryInfo( +// Needs to be private[ui] because of a false positive MiMa failure. +private[ui] case class ExecutorSummaryInfo( id: String, hostPort: String, rddBlocks: Int, @@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo( totalInputBytes: Long, totalShuffleRead: Long, totalShuffleWrite: Long, - maxMemory: Long) + maxMemory: Long, + executorLogs: Map[String, String]) private[ui] class ExecutorsPage( parent: ExecutorsTab, @@ -55,6 +57,7 @@ private[ui] class ExecutorsPage( val diskUsed = storageStatusList.map(_.diskUsed).sum val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execInfoSorted = execInfo.sortBy(_.id) + val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty val execTable = <table class={UIUtils.TABLE_CLASS_STRIPED}> @@ -79,10 +82,11 @@ private[ui] class ExecutorsPage( Shuffle Write </span> </th> + {if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty} {if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty} </thead> <tbody> - {execInfoSorted.map(execRow)} + {execInfoSorted.map(execRow(_, logsExist))} </tbody> </table> @@ -107,7 +111,7 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummaryInfo): Seq[Node] = { + private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -139,6 +143,21 @@ private[ui] class ExecutorsPage( {Utils.bytesToString(info.totalShuffleWrite)} </td> { + if (logsExist) { + <td> + { + info.executorLogs.map { case (logName, logUrl) => + <div> + <a href={logUrl}> + {logName} + </a> + </div> + } + } + </td> + } + } + { if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") <td> @@ -168,6 +187,7 @@ private[ui] class ExecutorsPage( val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) + val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) new ExecutorSummaryInfo( execId, @@ -183,7 +203,8 @@ private[ui] class ExecutorsPage( totalInputBytes, totalShuffleRead, totalShuffleWrite, - maxMem + maxMem, + executorLogs ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index dd1c2b78c4..a38cb75fdd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToOutputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() + val executorToLogUrls = HashMap[String, Map[String, String]]() def storageStatusList = storageStatusListener.storageStatusList + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized { + val eid = executorAdded.executorId + executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap + } + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val eid = taskStart.taskInfo.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e0e41ad37..c8407bbcb7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -383,7 +383,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ - ("Total Cores" -> executorInfo.totalCores) + ("Total Cores" -> executorInfo.totalCores) ~ + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) } /** ------------------------------ * @@ -792,7 +793,8 @@ private[spark] object JsonProtocol { def executorInfoFromJson(json: JValue): ExecutorInfo = { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] - new ExecutorInfo(executorHost, totalCores) + val logUrls = mapFromJson(json \ "Log Urls").toMap + new ExecutorInfo(executorHost, totalCores, logUrls) } /** -------------------------------- * |