aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala7
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
11 files changed, 68 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bc9f78b9e5..0add3064da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
+ val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
@@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
+
+ // Add webUI log urls
+ val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
+ builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
+
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b20f5c0c82..10929eb516 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
+ webUiPort,
sparkHome,
executorDir,
akkaUrl,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index bc72c89703..3a42f8b157 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- driver ! RegisterExecutor(executorId, hostPort, cores)
+ driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
+ def extractLogUrls: Map[String, String] = {
+ val prefix = "SPARK_LOG_URL_"
+ sys.env.filterKeys(_.startsWith(prefix))
+ .map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
+ }
+
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 1da6fe976d..9bf74f4be1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
// Executors to driver
- case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ case class RegisterExecutor(
+ executorId: String,
+ hostPort: String,
+ cores: Int,
+ logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 103a5c053c..9d2fb4f3b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
def receiveWithLogging = {
- case RegisterExecutor(executorId, hostPort, cores) =>
+ case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
- val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+ val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index eb52ddfb1e..5e571efe76 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
val executorAddress: Address,
override val executorHost: String,
var freeCores: Int,
- override val totalCores: Int
-) extends ExecutorInfo(executorHost, totalCores)
+ override val totalCores: Int,
+ override val logUrlMap: Map[String, String]
+) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index b4738e64c9..7f21856614 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
- val totalCores: Int
-) {
+ val totalCores: Int,
+ val logUrlMap: Map[String, String]) {
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@@ -34,12 +34,13 @@ class ExecutorInfo(
case that: ExecutorInfo =>
(that canEqual this) &&
executorHost == that.executorHost &&
- totalCores == that.totalCores
+ totalCores == that.totalCores &&
+ logUrlMap == that.logUrlMap
case _ => false
}
override def hashCode(): Int = {
- val state = Seq(executorHost, totalCores)
+ val state = Seq(executorHost, totalCores, logUrlMap)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index c3c546be6d..cfb6592e14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
- new ExecutorInfo(o.host, o.cores)))
+ // TODO: Add support for log urls for Mesos
+ new ExecutorInfo(o.host, o.cores, Map.empty)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 363cb96de7..956608d7c0 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Summary information about an executor to display in the UI. */
-private case class ExecutorSummaryInfo(
+// Needs to be private[ui] because of a false positive MiMa failure.
+private[ui] case class ExecutorSummaryInfo(
id: String,
hostPort: String,
rddBlocks: Int,
@@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
- maxMemory: Long)
+ maxMemory: Long,
+ executorLogs: Map[String, String])
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
@@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
+ val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
@@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
+ {if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
- {execInfoSorted.map(execRow)}
+ {execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
@@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
}
/** Render an HTML row representing an executor */
- private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
+ private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
@@ -139,6 +143,21 @@ private[ui] class ExecutorsPage(
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
+ if (logsExist) {
+ <td>
+ {
+ info.executorLogs.map { case (logName, logUrl) =>
+ <div>
+ <a href={logUrl}>
+ {logName}
+ </a>
+ </div>
+ }
+ }
+ </td>
+ }
+ }
+ {
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
@@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
+ val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
new ExecutorSummaryInfo(
execId,
@@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
- maxMem
+ maxMem,
+ executorLogs
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index dd1c2b78c4..a38cb75fdd 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
+ val executorToLogUrls = HashMap[String, Map[String, String]]()
def storageStatusList = storageStatusListener.storageStatusList
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
+ val eid = executorAdded.executorId
+ executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e0e41ad37..c8407bbcb7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -383,7 +383,8 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
- ("Total Cores" -> executorInfo.totalCores)
+ ("Total Cores" -> executorInfo.totalCores) ~
+ ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
}
/** ------------------------------ *
@@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
- new ExecutorInfo(executorHost, totalCores)
+ val logUrls = mapFromJson(json \ "Log Urls").toMap
+ new ExecutorInfo(executorHost, totalCores, logUrls)
}
/** -------------------------------- *