diff options
18 files changed, 178 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index bc9f78b9e5..0add3064da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -43,6 +43,7 @@ private[spark] class ExecutorRunner( val worker: ActorRef, val workerId: String, val host: String, + val webUiPort: Int, val sparkHome: File, val executorDir: File, val workerUrl: String, @@ -134,6 +135,12 @@ private[spark] class ExecutorRunner( // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") + + // Add webUI log urls + val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") + builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") + process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b20f5c0c82..10929eb516 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -362,6 +362,7 @@ private[spark] class Worker( self, workerId, host, + webUiPort, sparkHome, executorDir, akkaUrl, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index bc72c89703..3a42f8b157 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend( override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - driver ! RegisterExecutor(executorId, hostPort, cores) + driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } + def extractLogUrls: Map[String, String] = { + val prefix = "SPARK_LOG_URL_" + sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) + } + override def receiveWithLogging = { case RegisteredExecutor => logInfo("Successfully registered with driver") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 1da6fe976d..9bf74f4be1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage // Executors to driver - case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) + case class RegisterExecutor( + executorId: String, + hostPort: String, + cores: Int, + logUrls: Map[String, String]) extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 103a5c053c..9d2fb4f3b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste } def receiveWithLogging = { - case RegisterExecutor(executorId, hostPort, cores) => + case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) - val data = new ExecutorData(sender, sender.path.address, host, cores, cores) + val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index eb52ddfb1e..5e571efe76 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -33,5 +33,6 @@ private[cluster] class ExecutorData( val executorAddress: Address, override val executorHost: String, var freeCores: Int, - override val totalCores: Int -) extends ExecutorInfo(executorHost, totalCores) + override val totalCores: Int, + override val logUrlMap: Map[String, String] +) extends ExecutorInfo(executorHost, totalCores, logUrlMap) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index b4738e64c9..7f21856614 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi class ExecutorInfo( val executorHost: String, - val totalCores: Int -) { + val totalCores: Int, + val logUrlMap: Map[String, String]) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -34,12 +34,13 @@ class ExecutorInfo( case that: ExecutorInfo => (that canEqual this) && executorHost == that.executorHost && - totalCores == that.totalCores + totalCores == that.totalCores && + logUrlMap == that.logUrlMap case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores) + val state = Seq(executorHost, totalCores, logUrlMap) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c3c546be6d..cfb6592e14 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend( mesosTasks.foreach { case (slaveId, tasks) => slaveIdToWorkerOffer.get(slaveId).foreach(o => listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, - new ExecutorInfo(o.host, o.cores))) + // TODO: Add support for log urls for Mesos + new ExecutorInfo(o.host, o.cores, Map.empty))) ) d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 363cb96de7..956608d7c0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Summary information about an executor to display in the UI. */ -private case class ExecutorSummaryInfo( +// Needs to be private[ui] because of a false positive MiMa failure. +private[ui] case class ExecutorSummaryInfo( id: String, hostPort: String, rddBlocks: Int, @@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo( totalInputBytes: Long, totalShuffleRead: Long, totalShuffleWrite: Long, - maxMemory: Long) + maxMemory: Long, + executorLogs: Map[String, String]) private[ui] class ExecutorsPage( parent: ExecutorsTab, @@ -55,6 +57,7 @@ private[ui] class ExecutorsPage( val diskUsed = storageStatusList.map(_.diskUsed).sum val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execInfoSorted = execInfo.sortBy(_.id) + val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty val execTable = <table class={UIUtils.TABLE_CLASS_STRIPED}> @@ -79,10 +82,11 @@ private[ui] class ExecutorsPage( Shuffle Write </span> </th> + {if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty} {if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty} </thead> <tbody> - {execInfoSorted.map(execRow)} + {execInfoSorted.map(execRow(_, logsExist))} </tbody> </table> @@ -107,7 +111,7 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummaryInfo): Seq[Node] = { + private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -139,6 +143,21 @@ private[ui] class ExecutorsPage( {Utils.bytesToString(info.totalShuffleWrite)} </td> { + if (logsExist) { + <td> + { + info.executorLogs.map { case (logName, logUrl) => + <div> + <a href={logUrl}> + {logName} + </a> + </div> + } + } + </td> + } + } + { if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") <td> @@ -168,6 +187,7 @@ private[ui] class ExecutorsPage( val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) + val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) new ExecutorSummaryInfo( execId, @@ -183,7 +203,8 @@ private[ui] class ExecutorsPage( totalInputBytes, totalShuffleRead, totalShuffleWrite, - maxMem + maxMem, + executorLogs ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index dd1c2b78c4..a38cb75fdd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToOutputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() + val executorToLogUrls = HashMap[String, Map[String, String]]() def storageStatusList = storageStatusListener.storageStatusList + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized { + val eid = executorAdded.executorId + executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap + } + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val eid = taskStart.taskInfo.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e0e41ad37..c8407bbcb7 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -383,7 +383,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ - ("Total Cores" -> executorInfo.totalCores) + ("Total Cores" -> executorInfo.totalCores) ~ + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) } /** ------------------------------ * @@ -792,7 +793,8 @@ private[spark] object JsonProtocol { def executorInfoFromJson(json: JValue): ExecutorInfo = { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] - new ExecutorInfo(executorHost, totalCores) + val logUrls = mapFromJson(json \ "Log Urls").toMap + new ExecutorInfo(executorHost, totalCores, logUrls) } /** -------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index aa65f7e891..ed02ca81e4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite { } def createExecutorRunner(): ExecutorRunner = { - new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", + new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123, new File("sparkHome"), new File("workDir"), "akka://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala new file mode 100644 index 0000000000..f33bdc73e4 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import scala.collection.mutable + +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} +import org.apache.spark.{SparkContext, LocalSparkContext} + +class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { + + /** Length of time to wait while draining listener events. */ + val WAIT_TIMEOUT_MILLIS = 10000 + + before { + sc = new SparkContext("local-cluster[2,1,512]", "test") + } + + test("verify log urls get propagated from workers") { + val listener = new SaveExecutorInfo + sc.addSparkListener(listener) + + val rdd1 = sc.parallelize(1 to 100, 4) + val rdd2 = rdd1.map(_.toString) + rdd2.setName("Target RDD") + rdd2.count() + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } + } + + private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor: SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 6f233d7cf9..76511699e5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index f2ff98eb72..46ab02bfef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea conf.set("spark.mesos.executor.home" , "/mesos-home") val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2))) + listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) @@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2))) + listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6577ebaa2e..842f54529b 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite { val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11)) + new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") testEvent(stageSubmitted, stageSubmittedJsonString) @@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite { } test("Dependent Classes") { + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) - testExecutorInfo(new ExecutorInfo("host", 43)) + testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite { | "Executor ID": "exec1", | "Executor Info": { | "Host": "Hostee.awesome.com", - | "Total Cores": 11 + | "Total Cores": 11, + | "Log Urls" : { + | "stderr" : "mystderr", + | "stdout" : "mystdout" + | } | } |} """ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ee2002a35f..408cf09b9b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -56,7 +56,7 @@ class ExecutorRunnable( var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - lazy val env = prepareEnvironment + lazy val env = prepareEnvironment(container) def run = { logInfo("Starting Executor Container") @@ -254,7 +254,7 @@ class ExecutorRunnable( localResources } - private def prepareEnvironment: HashMap[String, String] = { + private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.executor.extraClassPath") Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp) @@ -270,6 +270,14 @@ class ExecutorRunnable( YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) } + // Add log urls + sys.env.get("SPARK_USER").foreach { user => + val baseUrl = "http://%s/node/containerlogs/%s/%s" + .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user) + env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0" + env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0" + } + System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } env } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 7165918e1b..eda40efc4c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -21,16 +21,17 @@ import java.io.File import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ +import scala.collection.mutable import com.google.common.base.Charsets import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { @@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit var result = File.createTempFile("result", null, tempDir) YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) checkResult(result) + + // verify log urls are present + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode") { @@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit "--num-executors", "1") Client.main(args) checkResult(result) + + // verify log urls are present. + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } +private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor : SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo + } +} + private object YarnClusterDriver extends Logging with Matchers { + val WAIT_TIMEOUT_MILLIS = 10000 + var listener: SaveExecutorInfo = null + def main(args: Array[String]) = { if (args.length != 2) { System.err.println( @@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers { System.exit(1) } + listener = new SaveExecutorInfo val sc = new SparkContext(new SparkConf().setMaster(args(0)) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + sc.addSparkListener(listener) val status = new File(args(1)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) data should be (Set(1, 2, 3, 4)) result = "success" } finally { |