aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala7
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala59
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala12
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala12
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala31
18 files changed, 178 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bc9f78b9e5..0add3064da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
+ val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
@@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
+
+ // Add webUI log urls
+ val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
+ builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
+
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b20f5c0c82..10929eb516 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
+ webUiPort,
sparkHome,
executorDir,
akkaUrl,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index bc72c89703..3a42f8b157 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- driver ! RegisterExecutor(executorId, hostPort, cores)
+ driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
+ def extractLogUrls: Map[String, String] = {
+ val prefix = "SPARK_LOG_URL_"
+ sys.env.filterKeys(_.startsWith(prefix))
+ .map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
+ }
+
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 1da6fe976d..9bf74f4be1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
// Executors to driver
- case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ case class RegisterExecutor(
+ executorId: String,
+ hostPort: String,
+ cores: Int,
+ logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 103a5c053c..9d2fb4f3b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
def receiveWithLogging = {
- case RegisterExecutor(executorId, hostPort, cores) =>
+ case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
- val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+ val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index eb52ddfb1e..5e571efe76 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
val executorAddress: Address,
override val executorHost: String,
var freeCores: Int,
- override val totalCores: Int
-) extends ExecutorInfo(executorHost, totalCores)
+ override val totalCores: Int,
+ override val logUrlMap: Map[String, String]
+) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index b4738e64c9..7f21856614 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
- val totalCores: Int
-) {
+ val totalCores: Int,
+ val logUrlMap: Map[String, String]) {
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@@ -34,12 +34,13 @@ class ExecutorInfo(
case that: ExecutorInfo =>
(that canEqual this) &&
executorHost == that.executorHost &&
- totalCores == that.totalCores
+ totalCores == that.totalCores &&
+ logUrlMap == that.logUrlMap
case _ => false
}
override def hashCode(): Int = {
- val state = Seq(executorHost, totalCores)
+ val state = Seq(executorHost, totalCores, logUrlMap)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index c3c546be6d..cfb6592e14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
- new ExecutorInfo(o.host, o.cores)))
+ // TODO: Add support for log urls for Mesos
+ new ExecutorInfo(o.host, o.cores, Map.empty)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 363cb96de7..956608d7c0 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Summary information about an executor to display in the UI. */
-private case class ExecutorSummaryInfo(
+// Needs to be private[ui] because of a false positive MiMa failure.
+private[ui] case class ExecutorSummaryInfo(
id: String,
hostPort: String,
rddBlocks: Int,
@@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
- maxMemory: Long)
+ maxMemory: Long,
+ executorLogs: Map[String, String])
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
@@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
+ val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
@@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
+ {if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
- {execInfoSorted.map(execRow)}
+ {execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
@@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
}
/** Render an HTML row representing an executor */
- private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
+ private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
@@ -139,6 +143,21 @@ private[ui] class ExecutorsPage(
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
+ if (logsExist) {
+ <td>
+ {
+ info.executorLogs.map { case (logName, logUrl) =>
+ <div>
+ <a href={logUrl}>
+ {logName}
+ </a>
+ </div>
+ }
+ }
+ </td>
+ }
+ }
+ {
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
@@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
+ val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
new ExecutorSummaryInfo(
execId,
@@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
- maxMem
+ maxMem,
+ executorLogs
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index dd1c2b78c4..a38cb75fdd 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
+ val executorToLogUrls = HashMap[String, Map[String, String]]()
def storageStatusList = storageStatusListener.storageStatusList
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
+ val eid = executorAdded.executorId
+ executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e0e41ad37..c8407bbcb7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -383,7 +383,8 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
- ("Total Cores" -> executorInfo.totalCores)
+ ("Total Cores" -> executorInfo.totalCores) ~
+ ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
}
/** ------------------------------ *
@@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
- new ExecutorInfo(executorHost, totalCores)
+ val logUrls = mapFromJson(json \ "Log Urls").toMap
+ new ExecutorInfo(executorHost, totalCores, logUrls)
}
/** -------------------------------- *
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index aa65f7e891..ed02ca81e4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner(): ExecutorRunner = {
- new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
+ new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
new file mode 100644
index 0000000000..f33bdc73e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
+import org.apache.spark.{SparkContext, LocalSparkContext}
+
+class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
+
+ /** Length of time to wait while draining listener events. */
+ val WAIT_TIMEOUT_MILLIS = 10000
+
+ before {
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ }
+
+ test("verify log urls get propagated from workers") {
+ val listener = new SaveExecutorInfo
+ sc.addSparkListener(listener)
+
+ val rdd1 = sc.parallelize(1 to 100, 4)
+ val rdd2 = rdd1.map(_.toString)
+ rdd2.setName("Target RDD")
+ rdd2.count()
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
+ }
+
+ private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 6f233d7cf9..76511699e5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
- val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
+ val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index f2ff98eb72..46ab02bfef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
conf.set("spark.mesos.executor.home" , "/mesos-home")
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
+ listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
@@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
+ listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6577ebaa2e..842f54529b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
+ val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
- new ExecutorInfo("Hostee.awesome.com", 11))
+ new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite {
}
test("Dependent Classes") {
+ val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
- testExecutorInfo(new ExecutorInfo("host", 43))
+ testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
// StorageLevel
testStorageLevel(StorageLevel.NONE)
@@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "exec1",
| "Executor Info": {
| "Host": "Hostee.awesome.com",
- | "Total Cores": 11
+ | "Total Cores": 11,
+ | "Log Urls" : {
+ | "stderr" : "mystderr",
+ | "stdout" : "mystdout"
+ | }
| }
|}
"""
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ee2002a35f..408cf09b9b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -56,7 +56,7 @@ class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- lazy val env = prepareEnvironment
+ lazy val env = prepareEnvironment(container)
def run = {
logInfo("Starting Executor Container")
@@ -254,7 +254,7 @@ class ExecutorRunnable(
localResources
}
- private def prepareEnvironment: HashMap[String, String] = {
+ private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
@@ -270,6 +270,14 @@ class ExecutorRunnable(
YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
}
+ // Add log urls
+ sys.env.get("SPARK_USER").foreach { user =>
+ val baseUrl = "http://%s/node/containerlogs/%s/%s"
+ .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user)
+ env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0"
+ env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0"
+ }
+
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
env
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 7165918e1b..eda40efc4c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -21,16 +21,17 @@ import java.io.File
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
+import scala.collection.mutable
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
@@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
+
+ // verify log urls are present
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode") {
@@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
"--num-executors", "1")
Client.main(args)
checkResult(result)
+
+ // verify log urls are present.
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
+private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
+ }
+}
+
private object YarnClusterDriver extends Logging with Matchers {
+ val WAIT_TIMEOUT_MILLIS = 10000
+ var listener: SaveExecutorInfo = null
+
def main(args: Array[String]) = {
if (args.length != 2) {
System.err.println(
@@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
+ listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf().setMaster(args(0))
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+ sc.addSparkListener(listener)
val status = new File(args(1))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {