diff options
Diffstat (limited to 'yarn/src/test/scala/org/apache/spark/deploy')
5 files changed, 74 insertions, 37 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 2f3a31cb04..9c3b18e4ec 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -53,7 +53,7 @@ abstract class BaseYarnClusterSuite |log4j.logger.org.apache.hadoop=WARN |log4j.logger.org.eclipse.jetty=WARN |log4j.logger.org.mortbay=WARN - |log4j.logger.org.spark-project.jetty=WARN + |log4j.logger.org.spark_project.jetty=WARN """.stripMargin private var yarnCluster: MiniYARNCluster = _ diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 24472e006b..74e268dc48 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.io.File +import java.io.{File, FileOutputStream} import java.net.URI import java.util.Properties @@ -118,10 +118,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(SPARK_JARS, Seq(SPARK)) .set(USER_CLASS_PATH_FIRST, true) + .set("spark.yarn.dist.jars", ADDED) val env = new MutableHashMap[String, String]() - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + val args = new ClientArguments(Array("--jar", USER)) - populateClasspath(args, conf, sparkConf, env, true) + populateClasspath(args, conf, sparkConf, env) val cp = env("CLASSPATH").split(":|;|<CPS>") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("Jar path propagation through SparkConf") { - val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK)) - val client = createClient(sparkConf, - args = Array("--jar", USER, "--addJars", ADDED)) + val conf = new Configuration() + val sparkConf = new SparkConf() + .set(SPARK_JARS, Seq(SPARK)) + .set("spark.yarn.dist.jars", ADDED) + val client = createClient(sparkConf, args = Array("--jar", USER)) val tempDir = Utils.createTempDir() try { @@ -178,8 +181,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll "/remotePath/1:/remotePath/2") val env = new MutableHashMap[String, String]() - populateClasspath(null, conf, sparkConf, env, false, - extraClassPath = Some("/localPath/my1.jar")) + populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar")) val cp = classpath(env) cp should contain ("/remotePath/spark.jar") cp should contain ("/remotePath/my1.jar") @@ -193,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf() .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup") .set(MAX_APP_ATTEMPTS, 42) - val args = new ClientArguments(Array( - "--name", "foo-test-app", - "--queue", "staging-queue"), sparkConf) + .set("spark.app.name", "foo-test-app") + .set(QUEUE_NAME, "staging-queue") + val args = new ClientArguments(Array()) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) @@ -271,9 +273,10 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll test("distribute local spark jars") { val temp = Utils.createTempDir() - val jarsDir = new File(temp, "lib") + val jarsDir = new File(temp, "jars") assert(jarsDir.mkdir()) val jar = TestUtils.createJarWithFiles(Map(), jarsDir) + new FileOutputStream(new File(temp, "RELEASE")).close() val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) @@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll sparkConf: SparkConf, conf: Configuration = new Configuration(), args: Array[String] = Array()): Client = { - val clientArgs = new ClientArguments(args, sparkConf) + val clientArgs = new ClientArguments(args) val client = spy(new Client(clientArgs, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), any(classOf[Path]), anyShort()) @@ -355,7 +358,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private def classpath(client: Client): Array[String] = { val env = new MutableHashMap[String, String]() - populateClasspath(null, client.hadoopConf, client.sparkConf, env, false) + populateClasspath(null, client.hadoopConf, client.sparkConf, env) classpath(env) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 0587444a33..a641a6e73e 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator(maxExecutors: Int = 5): YarnAllocator = { val args = Array( - "--executor-cores", "5", - "--executor-memory", "2048", "--jar", "somejar.jar", "--class", "SomeClass") val sparkConfClone = sparkConf.clone() - sparkConfClone.set("spark.executor.instances", maxExecutors.toString) + sparkConfClone + .set("spark.executor.instances", maxExecutors.toString) + .set("spark.executor.cores", "5") + .set("spark.executor.memory", "2048") new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), @@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConfClone, rmClient, appAttemptId, - new ApplicationMasterArguments(args), new SecurityManager(sparkConf)) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 26520529ec..b2b4d84f53 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testBasicYarnApp(false) } + test("run Spark in yarn-client mode with different configurations") { + testBasicYarnApp(true, + Map( + "spark.driver.memory" -> "512m", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "512m", + "spark.executor.instances" -> "2" + )) + } + + test("run Spark in yarn-cluster mode with different configurations") { + testBasicYarnApp(true, + Map( + "spark.driver.memory" -> "512m", + "spark.driver.cores" -> "1", + "spark.executor.cores" -> "1", + "spark.executor.memory" -> "512m", + "spark.executor.instances" -> "2" + )) + } + + test("run Spark in yarn-client mode with additional jar") { + testWithAddJar(true) + } + + test("run Spark in yarn-cluster mode with additional jar") { + testWithAddJar(false) + } + test("run Spark in yarn-cluster mode unsuccessfully") { // Don't provide arguments so the driver will fail. val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass)) @@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite { } } - private def testBasicYarnApp(clientMode: Boolean): Unit = { + private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = { val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), - appArgs = Seq(result.getAbsolutePath())) + appArgs = Seq(result.getAbsolutePath()), + extraConf = conf) checkResult(finalState, result) } + private def testWithAddJar(clientMode: Boolean): Unit = { + val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) + val driverResult = File.createTempFile("driver", null, tempDir) + val executorResult = File.createTempFile("executor", null, tempDir) + val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), + extraClassPath = Seq(originalJar.getPath()), + extraJars = Seq("local:" + originalJar.getPath())) + checkResult(finalState, driverResult, "ORIGINAL") + checkResult(finalState, executorResult, "ORIGINAL") + } + private def testPySpark(clientMode: Boolean): Unit = { val primaryPyFile = new File(tempDir, "test.py") Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index de14e36f4e..fe09808ae5 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -101,22 +101,18 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) viewAcls match { - case Some(vacls) => { + case Some(vacls) => val aclSet = vacls.split(',').map(_.trim).toSet assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } modifyAcls match { - case Some(macls) => { + case Some(macls) => val aclSet = macls.split(',').map(_.trim).toSet assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } } @@ -135,26 +131,22 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) viewAcls match { - case Some(vacls) => { + case Some(vacls) => val aclSet = vacls.split(',').map(_.trim).toSet assert(aclSet.contains("user1")) assert(aclSet.contains("user2")) assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } modifyAcls match { - case Some(macls) => { + case Some(macls) => val aclSet = macls.split(',').map(_.trim).toSet assert(aclSet.contains("user3")) assert(aclSet.contains("user4")) assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - } - case None => { + case None => fail() - } } } |