diff options
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 12 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 31 |
2 files changed, 38 insertions, 5 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ee2002a35f..408cf09b9b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -56,7 +56,7 @@ class ExecutorRunnable( var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - lazy val env = prepareEnvironment + lazy val env = prepareEnvironment(container) def run = { logInfo("Starting Executor Container") @@ -254,7 +254,7 @@ class ExecutorRunnable( localResources } - private def prepareEnvironment: HashMap[String, String] = { + private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.executor.extraClassPath") Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp) @@ -270,6 +270,14 @@ class ExecutorRunnable( YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) } + // Add log urls + sys.env.get("SPARK_USER").foreach { user => + val baseUrl = "http://%s/node/containerlogs/%s/%s" + .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user) + env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0" + env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0" + } + System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } env } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 7165918e1b..eda40efc4c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -21,16 +21,17 @@ import java.io.File import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ +import scala.collection.mutable import com.google.common.base.Charsets import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { @@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit var result = File.createTempFile("result", null, tempDir) YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) checkResult(result) + + // verify log urls are present + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode") { @@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit "--num-executors", "1") Client.main(args) checkResult(result) + + // verify log urls are present. + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } +private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor : SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo + } +} + private object YarnClusterDriver extends Logging with Matchers { + val WAIT_TIMEOUT_MILLIS = 10000 + var listener: SaveExecutorInfo = null + def main(args: Array[String]) = { if (args.length != 2) { System.err.println( @@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers { System.exit(1) } + listener = new SaveExecutorInfo val sc = new SparkContext(new SparkConf().setMaster(args(0)) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + sc.addSparkListener(listener) val status = new File(args(1)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) data should be (Set(1, 2, 3, 4)) result = "success" } finally { |