aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorKostas Sakellis <kostas@cloudera.com>2015-02-06 11:13:00 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-02-06 11:13:00 -0800
commit32e964c410e7083b43264c46291e93cd206a8038 (patch)
tree9b46fe25fbac715da10c2689506ef00ee640b9b3 /yarn/src
parent4cdb26c174e479a144950d12e1ad180f361af1fd (diff)
downloadspark-32e964c410e7083b43264c46291e93cd206a8038.tar.gz
spark-32e964c410e7083b43264c46291e93cd206a8038.tar.bz2
spark-32e964c410e7083b43264c46291e93cd206a8038.zip
SPARK-2450 Adds executor log links to Web UI
Adds links to stderr/stdout in the executor tab of the webUI for: 1) Standalone 2) Yarn client 3) Yarn cluster This tries to add the log url support in a general way so as to make it easy to add support for all the cluster managers. This is done by using environment variables to pass to the executor the log urls. The SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added. To propagate this information to the UI we use the onExecutorAdded spark listener event. Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism. Author: Kostas Sakellis <kostas@cloudera.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #3486 from ksakellis/kostas-spark-2450 and squashes the following commits: d190936 [Josh Rosen] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main(). 8673fe1 [Kostas Sakellis] CR feedback. Hide the log column if there are no logs available 5bf6952 [Kostas Sakellis] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala12
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala31
2 files changed, 38 insertions, 5 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ee2002a35f..408cf09b9b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -56,7 +56,7 @@ class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- lazy val env = prepareEnvironment
+ lazy val env = prepareEnvironment(container)
def run = {
logInfo("Starting Executor Container")
@@ -254,7 +254,7 @@ class ExecutorRunnable(
localResources
}
- private def prepareEnvironment: HashMap[String, String] = {
+ private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
@@ -270,6 +270,14 @@ class ExecutorRunnable(
YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
}
+ // Add log urls
+ sys.env.get("SPARK_USER").foreach { user =>
+ val baseUrl = "http://%s/node/containerlogs/%s/%s"
+ .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user)
+ env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0"
+ env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0"
+ }
+
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
env
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 7165918e1b..eda40efc4c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -21,16 +21,17 @@ import java.io.File
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
+import scala.collection.mutable
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
@@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
+
+ // verify log urls are present
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode") {
@@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
"--num-executors", "1")
Client.main(args)
checkResult(result)
+
+ // verify log urls are present.
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
+private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
+ }
+}
+
private object YarnClusterDriver extends Logging with Matchers {
+ val WAIT_TIMEOUT_MILLIS = 10000
+ var listener: SaveExecutorInfo = null
+
def main(args: Array[String]) = {
if (args.length != 2) {
System.err.println(
@@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
+ listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf().setMaster(args(0))
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+ sc.addSparkListener(listener)
val status = new File(args(1))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {