aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala12
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala31
2 files changed, 38 insertions, 5 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index ee2002a35f..408cf09b9b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -56,7 +56,7 @@ class ExecutorRunnable(
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- lazy val env = prepareEnvironment
+ lazy val env = prepareEnvironment(container)
def run = {
logInfo("Starting Executor Container")
@@ -254,7 +254,7 @@ class ExecutorRunnable(
localResources
}
- private def prepareEnvironment: HashMap[String, String] = {
+ private def prepareEnvironment(container: Container): HashMap[String, String] = {
val env = new HashMap[String, String]()
val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
@@ -270,6 +270,14 @@ class ExecutorRunnable(
YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
}
+ // Add log urls
+ sys.env.get("SPARK_USER").foreach { user =>
+ val baseUrl = "http://%s/node/containerlogs/%s/%s"
+ .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user)
+ env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0"
+ env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0"
+ }
+
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
env
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 7165918e1b..eda40efc4c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -21,16 +21,17 @@ import java.io.File
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
+import scala.collection.mutable
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
@@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
+
+ // verify log urls are present
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode") {
@@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
"--num-executors", "1")
Client.main(args)
checkResult(result)
+
+ // verify log urls are present.
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
+private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
+ }
+}
+
private object YarnClusterDriver extends Logging with Matchers {
+ val WAIT_TIMEOUT_MILLIS = 10000
+ var listener: SaveExecutorInfo = null
+
def main(args: Array[String]) = {
if (args.length != 2) {
System.err.println(
@@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
+ listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf().setMaster(args(0))
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+ sc.addSparkListener(listener)
val status = new File(args(1))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {