From d513c99c19e229f72d03006e251725a43c13fefd Mon Sep 17 00:00:00 2001 From: sharkd Date: Tue, 12 Jul 2016 10:10:35 -0700 Subject: [SPARK-16414][YARN] Fix bugs for "Can not get user config when calling SparkHadoopUtil.get.conf on yarn cluser mode" ## What changes were proposed in this pull request? The `SparkHadoopUtil` singleton was instantiated before `ApplicationMaster` in `ApplicationMaster.main` when deploying spark on yarn cluster mode, the `conf` in the `SparkHadoopUtil` singleton didn't include user's configuration. So, we should load the properties file with the Spark configuration and set entries as system properties before `SparkHadoopUtil` first instantiate. ## How was this patch tested? Add a test case Author: sharkd Author: sharkdtu Closes #14088 from sharkdtu/master. --- .../spark/deploy/yarn/ApplicationMaster.scala | 17 ++++---- .../spark/deploy/yarn/YarnClusterSuite.scala | 45 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 8 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b6f45dd634..c371ad616a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -50,14 +50,6 @@ private[spark] class ApplicationMaster( client: YarnRMClient) extends Logging { - // Load the properties file with the Spark configuration and set entries as system properties, - // so that user code run inside the AM also has access to them. - if (args.propertiesFile != null) { - Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => - sys.props(k) = v - } - } - // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. @@ -743,6 +735,15 @@ object ApplicationMaster extends Logging { def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) + + // Load the properties file with the Spark configuration and set entries as system properties, + // so that user code run inside the AM also has access to them. + // Note: we must do this before SparkHadoopUtil instantiated + if (amArgs.propertiesFile != null) { + Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => + sys.props(k) = v + } + } SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 9085fca1d3..874e3045b4 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.launcher._ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, @@ -106,6 +107,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite { )) } + test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") { + testYarnAppUseSparkHadoopUtilConf() + } + test("run Spark in yarn-client mode with additional jar") { testWithAddJar(true) } @@ -181,6 +186,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } + private def testYarnAppUseSparkHadoopUtilConf(): Unit = { + val result = File.createTempFile("result", null, tempDir) + val finalState = runSpark(false, + mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), + appArgs = Seq("key=value", result.getAbsolutePath()), + extraConf = Map("spark.hadoop.key" -> "value")) + checkResult(finalState, result) + } + private def testWithAddJar(clientMode: Boolean): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) @@ -274,6 +288,37 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers { } } +private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers { + def main(args: Array[String]): Unit = { + if (args.length != 2) { + // scalastyle:off println + System.err.println( + s""" + |Invalid command line: ${args.mkString(" ")} + | + |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file] + """.stripMargin) + // scalastyle:on println + System.exit(1) + } + + val sc = new SparkContext(new SparkConf() + .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) + .setAppName("yarn test using SparkHadoopUtil's conf")) + + val kv = args(0).split("=") + val status = new File(args(1)) + var result = "failure" + try { + SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1)) + result = "success" + } finally { + Files.write(result, status, StandardCharsets.UTF_8) + sc.stop() + } + } +} + private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 -- cgit v1.2.3