aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-01 10:52:13 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-01 10:52:13 -0700
commit8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e (patch)
treebbe9310838ffdf087dc628d4c9993ba58c932ce1 /yarn/src/test/scala/org
parent58e6bc827f1f9dc1afee07dca1bee1f56553dd20 (diff)
downloadspark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.gz
spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.tar.bz2
spark-8ba2b7f28fee39c4839e5ea125bd25f5091a3a1e.zip
[SPARK-12343][YARN] Simplify Yarn client and client argument
## What changes were proposed in this pull request? Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments. ## How was this patch tested? This patch is tested manually with unit test. CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set. Author: jerryshao <sshao@hortonworks.com> Closes #11603 from jerryshao/SPARK-12343.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala19
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala8
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala46
3 files changed, 59 insertions, 14 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 64723c361c..2eaafa072a 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -118,8 +118,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(SPARK_JARS, Seq(SPARK))
.set(USER_CLASS_PATH_FIRST, true)
+ .set("spark.yarn.dist.jars", ADDED)
val env = new MutableHashMap[String, String]()
- val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
+ val args = new ClientArguments(Array("--jar", USER))
populateClasspath(args, conf, sparkConf, env)
@@ -138,9 +139,11 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("Jar path propagation through SparkConf") {
- val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
- val client = createClient(sparkConf,
- args = Array("--jar", USER, "--addJars", ADDED))
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(SPARK_JARS, Seq(SPARK))
+ .set("spark.yarn.dist.jars", ADDED)
+ val client = createClient(sparkConf, args = Array("--jar", USER))
val tempDir = Utils.createTempDir()
try {
@@ -192,9 +195,9 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf()
.set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup")
.set(MAX_APP_ATTEMPTS, 42)
- val args = new ClientArguments(Array(
- "--name", "foo-test-app",
- "--queue", "staging-queue"), sparkConf)
+ .set("spark.app.name", "foo-test-app")
+ .set(QUEUE_NAME, "staging-queue")
+ val args = new ClientArguments(Array())
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
@@ -346,7 +349,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
sparkConf: SparkConf,
conf: Configuration = new Configuration(),
args: Array[String] = Array()): Client = {
- val clientArgs = new ClientArguments(args, sparkConf)
+ val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort())
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 0587444a33..a641a6e73e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -90,12 +90,13 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
- "--executor-cores", "5",
- "--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
- sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
+ sparkConfClone
+ .set("spark.executor.instances", maxExecutors.toString)
+ .set("spark.executor.cores", "5")
+ .set("spark.executor.memory", "2048")
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
@@ -103,7 +104,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new ApplicationMasterArguments(args),
new SecurityManager(sparkConf))
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 26520529ec..b2b4d84f53 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -85,6 +85,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testBasicYarnApp(false)
}
+ test("run Spark in yarn-client mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-cluster mode with different configurations") {
+ testBasicYarnApp(true,
+ Map(
+ "spark.driver.memory" -> "512m",
+ "spark.driver.cores" -> "1",
+ "spark.executor.cores" -> "1",
+ "spark.executor.memory" -> "512m",
+ "spark.executor.instances" -> "2"
+ ))
+ }
+
+ test("run Spark in yarn-client mode with additional jar") {
+ testWithAddJar(true)
+ }
+
+ test("run Spark in yarn-cluster mode with additional jar") {
+ testWithAddJar(false)
+ }
+
test("run Spark in yarn-cluster mode unsuccessfully") {
// Don't provide arguments so the driver will fail.
val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
@@ -139,13 +168,26 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
}
}
- private def testBasicYarnApp(clientMode: Boolean): Unit = {
+ private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
- appArgs = Seq(result.getAbsolutePath()))
+ appArgs = Seq(result.getAbsolutePath()),
+ extraConf = conf)
checkResult(finalState, result)
}
+ private def testWithAddJar(clientMode: Boolean): Unit = {
+ val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
+ val driverResult = File.createTempFile("driver", null, tempDir)
+ val executorResult = File.createTempFile("executor", null, tempDir)
+ val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
+ extraClassPath = Seq(originalJar.getPath()),
+ extraJars = Seq("local:" + originalJar.getPath()))
+ checkResult(finalState, driverResult, "ORIGINAL")
+ checkResult(finalState, executorResult, "ORIGINAL")
+ }
+
private def testPySpark(clientMode: Boolean): Unit = {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)