aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2015-02-06 11:13:00 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-06 11:13:00 -0800
commit32e964c410e7083b43264c46291e93cd206a8038 (patch)
tree9b46fe25fbac715da10c2689506ef00ee640b9b3 /core
parent4cdb26c174e479a144950d12e1ad180f361af1fd (diff)
downloadspark-32e964c410e7083b43264c46291e93cd206a8038.tar.gz
spark-32e964c410e7083b43264c46291e93cd206a8038.tar.bz2
spark-32e964c410e7083b43264c46291e93cd206a8038.zip
SPARK-2450 Adds executor log links to Web UI
Adds links to stderr/stdout in the executor tab of the webUI for: 1) Standalone 2) Yarn client 3) Yarn cluster This tries to add the log url support in a general way so as to make it easy to add support for all the cluster managers. This is done by using environment variables to pass to the executor the log urls. The SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added. To propagate this information to the UI we use the onExecutorAdded spark listener event. Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism. Author: Kostas Sakellis <kostas@cloudera.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #3486 from ksakellis/kostas-spark-2450 and squashes the following commits: d190936 [Josh Rosen] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main(). 8673fe1 [Kostas Sakellis] CR feedback. Hide the log column if there are no logs available 5bf6952 [Kostas Sakellis] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala7
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala59
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala12
16 files changed, 140 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index bc9f78b9e5..0add3064da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
+ val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
@@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
+
+ // Add webUI log urls
+ val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
+ builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
+ builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
+
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b20f5c0c82..10929eb516 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
+ webUiPort,
sparkHome,
executorDir,
akkaUrl,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index bc72c89703..3a42f8b157 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- driver ! RegisterExecutor(executorId, hostPort, cores)
+ driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
+ def extractLogUrls: Map[String, String] = {
+ val prefix = "SPARK_LOG_URL_"
+ sys.env.filterKeys(_.startsWith(prefix))
+ .map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
+ }
+
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 1da6fe976d..9bf74f4be1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
// Executors to driver
- case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ case class RegisterExecutor(
+ executorId: String,
+ hostPort: String,
+ cores: Int,
+ logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 103a5c053c..9d2fb4f3b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}
def receiveWithLogging = {
- case RegisterExecutor(executorId, hostPort, cores) =>
+ case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
- val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+ val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index eb52ddfb1e..5e571efe76 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
val executorAddress: Address,
override val executorHost: String,
var freeCores: Int,
- override val totalCores: Int
-) extends ExecutorInfo(executorHost, totalCores)
+ override val totalCores: Int,
+ override val logUrlMap: Map[String, String]
+) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
index b4738e64c9..7f21856614 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala
@@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
- val totalCores: Int
-) {
+ val totalCores: Int,
+ val logUrlMap: Map[String, String]) {
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@@ -34,12 +34,13 @@ class ExecutorInfo(
case that: ExecutorInfo =>
(that canEqual this) &&
executorHost == that.executorHost &&
- totalCores == that.totalCores
+ totalCores == that.totalCores &&
+ logUrlMap == that.logUrlMap
case _ => false
}
override def hashCode(): Int = {
- val state = Seq(executorHost, totalCores)
+ val state = Seq(executorHost, totalCores, logUrlMap)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index c3c546be6d..cfb6592e14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
- new ExecutorInfo(o.host, o.cores)))
+ // TODO: Add support for log urls for Mesos
+ new ExecutorInfo(o.host, o.cores, Map.empty)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 363cb96de7..956608d7c0 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Summary information about an executor to display in the UI. */
-private case class ExecutorSummaryInfo(
+// Needs to be private[ui] because of a false positive MiMa failure.
+private[ui] case class ExecutorSummaryInfo(
id: String,
hostPort: String,
rddBlocks: Int,
@@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
- maxMemory: Long)
+ maxMemory: Long,
+ executorLogs: Map[String, String])
private[ui] class ExecutorsPage(
parent: ExecutorsTab,
@@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
+ val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
@@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
+ {if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
- {execInfoSorted.map(execRow)}
+ {execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>
@@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
}
/** Render an HTML row representing an executor */
- private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
+ private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
@@ -139,6 +143,21 @@ private[ui] class ExecutorsPage(
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
+ if (logsExist) {
+ <td>
+ {
+ info.executorLogs.map { case (logName, logUrl) =>
+ <div>
+ <a href={logUrl}>
+ {logName}
+ </a>
+ </div>
+ }
+ }
+ </td>
+ }
+ }
+ {
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
<td>
@@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
+ val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
new ExecutorSummaryInfo(
execId,
@@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
- maxMem
+ maxMem,
+ executorLogs
)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index dd1c2b78c4..a38cb75fdd 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
+ val executorToLogUrls = HashMap[String, Map[String, String]]()
def storageStatusList = storageStatusListener.storageStatusList
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
+ val eid = executorAdded.executorId
+ executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
+ }
+
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e0e41ad37..c8407bbcb7 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -383,7 +383,8 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
- ("Total Cores" -> executorInfo.totalCores)
+ ("Total Cores" -> executorInfo.totalCores) ~
+ ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
}
/** ------------------------------ *
@@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
- new ExecutorInfo(executorHost, totalCores)
+ val logUrls = mapFromJson(json \ "Log Urls").toMap
+ new ExecutorInfo(executorHost, totalCores, logUrls)
}
/** -------------------------------- *
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index aa65f7e891..ed02ca81e4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner(): ExecutorRunner = {
- new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
+ new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
new file mode 100644
index 0000000000..f33bdc73e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import scala.collection.mutable
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
+import org.apache.spark.{SparkContext, LocalSparkContext}
+
+class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
+
+ /** Length of time to wait while draining listener events. */
+ val WAIT_TIMEOUT_MILLIS = 10000
+
+ before {
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ }
+
+ test("verify log urls get propagated from workers") {
+ val listener = new SaveExecutorInfo
+ sc.addSparkListener(listener)
+
+ val rdd1 = sc.parallelize(1 to 100, 4)
+ val rdd2 = rdd1.map(_.toString)
+ rdd2.setName("Target RDD")
+ rdd2.count()
+
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
+ }
+
+ private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index 6f233d7cf9..76511699e5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
- val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
+ val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index f2ff98eb72..46ab02bfef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
conf.set("spark.mesos.executor.home" , "/mesos-home")
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
+ listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
@@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
+ listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6577ebaa2e..842f54529b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
+ val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
- new ExecutorInfo("Hostee.awesome.com", 11))
+ new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite {
}
test("Dependent Classes") {
+ val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
- testExecutorInfo(new ExecutorInfo("host", 43))
+ testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
// StorageLevel
testStorageLevel(StorageLevel.NONE)
@@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "exec1",
| "Executor Info": {
| "Host": "Hostee.awesome.com",
- | "Total Cores": 11
+ | "Total Cores": 11,
+ | "Log Urls" : {
+ | "stderr" : "mystderr",
+ | "stdout" : "mystdout"
+ | }
| }
|}
"""