From 8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 1 Apr 2016 10:52:13 -0700 Subject: [SPARK-12343][YARN] Simplify Yarn client and client argument ## What changes were proposed in this pull request? Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments. ## How was this patch tested? This patch is tested manually with unit test. CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set. Author: jerryshao Closes #11603 from jerryshao/SPARK-12343. --- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 19 +++++---- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 8 ++-- .../spark/deploy/yarn/YarnClusterSuite.scala | 46 +++++++++++++++++++++- 3 files changed, 59 insertions(+), 14 deletions(-) (limited to 'yarn/src/test/scala/org') diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 64723c361c..2eaafa072a 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -118,8 +118,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(SPARK_JARS, Seq(SPARK)) .set(USER_CLASS_PATH_FIRST, true) + .set("spark.yarn.dist.jars", ADDED) val env = new MutableHashMap[String, String]() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + val args = new ClientArguments(Array("--jar", USER)) populateClasspath(args, conf, sparkConf, env) @@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("Jar path propagation through SparkConf") { - val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK)) - val client = createClient(sparkConf, - args = Array("--jar", USER, "--addJars", ADDED)) + val conf = new Configuration() + val sparkConf = new SparkConf() + .set(SPARK_JARS, Seq(SPARK)) + .set("spark.yarn.dist.jars", ADDED) + val client = createClient(sparkConf, args = Array("--jar", USER)) val tempDir = Utils.createTempDir() try { @@ -192,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup") .set(MAX_APP_ATTEMPTS, 42) - val args = new ClientArguments(Array( - "--name", "foo-test-app", - "--queue", "staging-queue"), sparkConf) + .set("spark.app.name", "foo-test-app") + .set(QUEUE_NAME, "staging-queue") + val args = new ClientArguments(Array()) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) @@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll sparkConf: SparkConf, conf: Configuration = new Configuration(), args: Array[String] = Array()): Client = { - val clientArgs = new ClientArguments(args, sparkConf) + val clientArgs = new ClientArguments(args) val client = spy(new Client(clientArgs, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), any(classOf[Path]), anyShort()) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 0587444a33..a641a6e73e 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator(maxExecutors: Int = 5): YarnAllocator = { val args = Array( - "--executor-cores", "5", - "--executor-memory", "2048", "--jar", "somejar.jar", "--class", "SomeClass") val sparkConfClone = sparkConf.clone() - sparkConfClone.set("spark.executor.instances", maxExecutors.toString) + sparkConfClone + .set("spark.executor.instances", maxExecutors.toString) + .set("spark.executor.cores", "5") + .set("spark.executor.memory", "2048") new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), @@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConfClone, rmClient, appAttemptId, - new ApplicationMasterArguments(args), new SecurityManager(sparkConf)) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 26520529ec..b2b4d84f53 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testBasicYarnApp(false) } + test("run Spark in yarn-client mode with different configurations") { + testBasicYarnApp(true, + Map( + "spark.driver.memory" -> "512m", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "512m", + "spark.executor.instances" -> "2" + )) + } + + test("run Spark in yarn-cluster mode with different configurations") { + testBasicYarnApp(true, + Map( + "spark.driver.memory" -> "512m", + "spark.driver.cores" -> "1", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "512m", + "spark.executor.instances" -> "2" + )) + } + + test("run Spark in yarn-client mode with additional jar") { + testWithAddJar(true) + } + + test("run Spark in yarn-cluster mode with additional jar") { + testWithAddJar(false) + } + test("run Spark in yarn-cluster mode unsuccessfully") { // Don't provide arguments so the driver will fail. val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass)) @@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } } - private def testBasicYarnApp(clientMode: Boolean): Unit = { + private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), - appArgs = Seq(result.getAbsolutePath())) + appArgs = Seq(result.getAbsolutePath()), + extraConf = conf) checkResult(finalState, result) } + private def testWithAddJar(clientMode: Boolean): Unit = { + val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + val driverResult = File.createTempFile("driver", null, tempDir) + val executorResult = File.createTempFile("executor", null, tempDir) + val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), + extraClassPath = Seq(originalJar.getPath()), + extraJars = Seq("local:" + originalJar.getPath())) + checkResult(finalState, driverResult, "ORIGINAL") + checkResult(finalState, executorResult, "ORIGINAL") + } + private def testPySpark(clientMode: Boolean): Unit = { val primaryPyFile = new File(tempDir, "test.py") Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) -- cgit v1.2.3