aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org/apache/spark/deploy
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src/test/scala/org/apache/spark/deploy')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala2
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala31
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala8
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala46
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala24
5 files changed, 74 insertions, 37 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 2f3a31cb04..9c3b18e4ec 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -53,7 +53,7 @@ abstract class BaseYarnClusterSuite
|log4j.logger.org.apache.hadoop=WARN
|log4j.logger.org.eclipse.jetty=WARN
|log4j.logger.org.mortbay=WARN
- |log4j.logger.org.spark-project.jetty=WARN
+ |log4j.logger.org.spark_project.jetty=WARN
""".stripMargin
private var yarnCluster: MiniYARNCluster = _
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 24472e006b..74e268dc48 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.io.File
+import java.io.{File, FileOutputStream}
import java.net.URI
import java.util.Properties
@@ -118,10 +118,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(SPARK_JARS, Seq(SPARK))
.set(USER_CLASS_PATH_FIRST, true)
+ .set("spark.yarn.dist.jars", ADDED)
val env = new MutableHashMap[String, String]()
- val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+ val args = new ClientArguments(Array("--jar", USER))
- populateClasspath(args, conf, sparkConf, env, true)
+ populateClasspath(args, conf, sparkConf, env)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("Jar path propagation through SparkConf") {
- val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
- val client = createClient(sparkConf,
- args = Array("--jar", USER, "--addJars", ADDED))
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(SPARK_JARS, Seq(SPARK))
+ .set("spark.yarn.dist.jars", ADDED)
+ val client = createClient(sparkConf, args = Array("--jar", USER))
val tempDir = Utils.createTempDir()
try {
@@ -178,8 +181,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
"/remotePath/1:/remotePath/2")
val env = new MutableHashMap[String, String]()
- populateClasspath(null, conf, sparkConf, env, false,
- extraClassPath = Some("/localPath/my1.jar"))
+ populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
cp should contain ("/remotePath/my1.jar")
@@ -193,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
.set(MAX_APP_ATTEMPTS, 42)
- val args = new ClientArguments(Array(
- "--name", "foo-test-app",
- "--queue", "staging-queue"), sparkConf)
+ .set("spark.app.name", "foo-test-app")
+ .set(QUEUE_NAME, "staging-queue")
+ val args = new ClientArguments(Array())
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -271,9 +273,10 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
test("distribute local spark jars") {
val temp = Utils.createTempDir()
- val jarsDir = new File(temp, "lib")
+ val jarsDir = new File(temp, "jars")
assert(jarsDir.mkdir())
val jar = TestUtils.createJarWithFiles(Map(), jarsDir)
+ new FileOutputStream(new File(temp, "RELEASE")).close()
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
@@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
sparkConf: SparkConf,
conf: Configuration = new Configuration(),
args: Array[String] = Array()): Client = {
- val clientArgs = new ClientArguments(args, sparkConf)
+ val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort())
@@ -355,7 +358,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private def classpath(client: Client): Array[String] = {
val env = new MutableHashMap[String, String]()
- populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+ populateClasspath(null, client.hadoopConf, client.sparkConf, env)
classpath(env)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 0587444a33..a641a6e73e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
- "--executor-cores", "5",
- "--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
- sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
+ sparkConfClone
+ .set("spark.executor.instances", maxExecutors.toString)
+ .set("spark.executor.cores", "5")
+ .set("spark.executor.memory", "2048")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
@@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new ApplicationMasterArguments(args),
new SecurityManager(sparkConf))
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 26520529ec..b2b4d84f53 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testBasicYarnApp(false)
}
+ test("run Spark in yarn-client mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-cluster mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.driver.cores" -> "1",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-client mode with additional jar") {
+ testWithAddJar(true)
+ }
+
+ test("run Spark in yarn-cluster mode with additional jar") {
+ testWithAddJar(false)
+ }
+
test("run Spark in yarn-cluster mode unsuccessfully") {
// Don't provide arguments so the driver will fail.
val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
@@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
}
}
- private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
- appArgs = Seq(result.getAbsolutePath()))
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = conf)
checkResult(finalState, result)
}
+ private def testWithAddJar(clientMode: Boolean): Unit = {
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + originalJar.getPath()))
+ checkResult(finalState, driverResult, "ORIGINAL")
+ checkResult(finalState, executorResult, "ORIGINAL")
+ }
+
private def testPySpark(clientMode: Boolean): Unit = {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index de14e36f4e..fe09808ae5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -101,22 +101,18 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
viewAcls match {
- case Some(vacls) => {
+ case Some(vacls) =>
val aclSet = vacls.split(',').map(_.trim).toSet
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
modifyAcls match {
- case Some(macls) => {
+ case Some(macls) =>
val aclSet = macls.split(',').map(_.trim).toSet
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
}
@@ -135,26 +131,22 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
viewAcls match {
- case Some(vacls) => {
+ case Some(vacls) =>
val aclSet = vacls.split(',').map(_.trim).toSet
assert(aclSet.contains("user1"))
assert(aclSet.contains("user2"))
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
modifyAcls match {
- case Some(macls) => {
+ case Some(macls) =>
val aclSet = macls.split(',').map(_.trim).toSet
assert(aclSet.contains("user3"))
assert(aclSet.contains("user4"))
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
- }
- case None => {
+ case None =>
fail()
- }
}
}