aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorsharkd <sharkd.tu@gmail.com>2016-07-12 10:10:35 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-12 10:10:35 -0700
commitd513c99c19e229f72d03006e251725a43c13fefd (patch)
tree699321cd48efb2929ab8b9fd610a981a4853e3da /yarn
parentc377e49e38a290e5c4fbc178278069788674dfb7 (diff)
downloadspark-d513c99c19e229f72d03006e251725a43c13fefd.tar.gz
spark-d513c99c19e229f72d03006e251725a43c13fefd.tar.bz2
spark-d513c99c19e229f72d03006e251725a43c13fefd.zip
[SPARK-16414][YARN] Fix bugs for "Can not get user config when calling SparkHadoopUtil.get.conf on yarn cluser mode"
## What changes were proposed in this pull request? The `SparkHadoopUtil` singleton was instantiated before `ApplicationMaster` in `ApplicationMaster.main` when deploying spark on yarn cluster mode, the `conf` in the `SparkHadoopUtil` singleton didn't include user's configuration. So, we should load the properties file with the Spark configuration and set entries as system properties before `SparkHadoopUtil` first instantiate. ## How was this patch tested? Add a test case Author: sharkd <sharkd.tu@gmail.com> Author: sharkdtu <sharkdtu@tencent.com> Closes #14088 from sharkdtu/master.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala45
2 files changed, 54 insertions, 8 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index b6f45dd634..c371ad616a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -50,14 +50,6 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {
- // Load the properties file with the Spark configuration and set entries as system properties,
- // so that user code run inside the AM also has access to them.
- if (args.propertiesFile != null) {
- Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
- sys.props(k) = v
- }
- }
-
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
@@ -743,6 +735,15 @@ object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
+
+ // Load the properties file with the Spark configuration and set entries as system properties,
+ // so that user code run inside the AM also has access to them.
+ // Note: we must do this before SparkHadoopUtil instantiated
+ if (amArgs.propertiesFile != null) {
+ Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
+ sys.props(k) = v
+ }
+ }
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 9085fca1d3..874e3045b4 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.launcher._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
@@ -106,6 +107,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
))
}
+ test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") {
+ testYarnAppUseSparkHadoopUtilConf()
+ }
+
test("run Spark in yarn-client mode with additional jar") {
testWithAddJar(true)
}
@@ -181,6 +186,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, result)
}
+ private def testYarnAppUseSparkHadoopUtilConf(): Unit = {
+ val result = File.createTempFile("result", null, tempDir)
+ val finalState = runSpark(false,
+ mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
+ appArgs = Seq("key=value", result.getAbsolutePath()),
+ extraConf = Map("spark.hadoop.key" -> "value"))
+ checkResult(finalState, result)
+ }
+
private def testWithAddJar(clientMode: Boolean): Unit = {
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
val driverResult = File.createTempFile("driver", null, tempDir)
@@ -274,6 +288,37 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers {
}
}
+private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers {
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2) {
+ // scalastyle:off println
+ System.err.println(
+ s"""
+ |Invalid command line: ${args.mkString(" ")}
+ |
+ |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file]
+ """.stripMargin)
+ // scalastyle:on println
+ System.exit(1)
+ }
+
+ val sc = new SparkContext(new SparkConf()
+ .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
+ .setAppName("yarn test using SparkHadoopUtil's conf"))
+
+ val kv = args(0).split("=")
+ val status = new File(args(1))
+ var result = "failure"
+ try {
+ SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
+ result = "success"
+ } finally {
+ Files.write(result, status, StandardCharsets.UTF_8)
+ sc.stop()
+ }
+ }
+}
+
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000