diff options
author | Sandy Ryza <sandy@cloudera.com> | 2015-01-08 09:25:43 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-01-08 09:25:43 -0800 |
commit | 8d45834debc6986e61831d0d6e982d5528dccc51 (patch) | |
tree | 4233ccff0dab1e94e2b20c953d23669426ae6ecd /yarn/src/test | |
parent | c08238570c4aa53b0bed24d3304ce104e9bd65ce (diff) | |
download | spark-8d45834debc6986e61831d0d6e982d5528dccc51.tar.gz spark-8d45834debc6986e61831d0d6e982d5528dccc51.tar.bz2 spark-8d45834debc6986e61831d0d6e982d5528dccc51.zip |
SPARK-5087. [YARN] Merge yarn.Client and yarn.ClientBase
Author: Sandy Ryza <sandy@cloudera.com>
Closes #3896 from sryza/sandy-spark-5087 and squashes the following commits:
65611d0 [Sandy Ryza] Review feedback
3294176 [Sandy Ryza] SPARK-5087. [YARN] Merge yarn.Client and yarn.ClientBase
Diffstat (limited to 'yarn/src/test')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala (renamed from yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala) | 64 |
1 files changed, 26 insertions, 38 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 17b79ae1d8..aad50015b7 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -41,38 +41,38 @@ import scala.util.Try import org.apache.spark.{SparkException, SparkConf} import org.apache.spark.util.Utils -class ClientBaseSuite extends FunSuite with Matchers { +class ClientSuite extends FunSuite with Matchers { test("default Yarn application classpath") { - ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + Client.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) } test("default MR application classpath") { - ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + Client.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) } test("resultant classpath for an application that defines a classpath for YARN") { withAppConf(Fixtures.mapYARNAppConf) { conf => val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) + Client.populateHadoopClasspath(conf, env) classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath)) + flatten(Fixtures.knownYARNAppCP, Client.getDefaultMRApplicationClasspath)) } } test("resultant classpath for an application that defines a classpath for MR") { withAppConf(Fixtures.mapMRAppConf) { conf => val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) + Client.populateHadoopClasspath(conf, env) classpath(env) should be( - flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + flatten(Client.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) } } test("resultant classpath for an application that defines both classpaths, YARN and MR") { withAppConf(Fixtures.mapAppConf) { conf => val env = newEnv - ClientBase.populateHadoopClasspath(conf, env) + Client.populateHadoopClasspath(conf, env) classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) } } @@ -83,16 +83,16 @@ class ClientBaseSuite extends FunSuite with Matchers { test("Local jar URIs") { val conf = new Configuration() - val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK) val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - ClientBase.populateClasspath(args, conf, sparkConf, env) + Client.populateClasspath(args, conf, sparkConf, env) val cp = env("CLASSPATH").split(File.pathSeparator) s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) - if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { + if (Client.LOCAL_SCHEME.equals(uri.getScheme())) { cp should contain (uri.getPath()) } else { cp should not contain (uri.getPath()) @@ -100,31 +100,30 @@ class ClientBaseSuite extends FunSuite with Matchers { }) cp should contain (Environment.PWD.$()) cp should contain (s"${Environment.PWD.$()}${File.separator}*") - cp should not contain (ClientBase.SPARK_JAR) - cp should not contain (ClientBase.APP_JAR) + cp should not contain (Client.SPARK_JAR) + cp should not contain (Client.APP_JAR) } test("Jar path propagation through SparkConf") { val conf = new Configuration() - val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) - val yarnConf = new YarnConfiguration() + val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK) val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) + val client = spy(new Client(args, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), any(classOf[Path]), anyShort(), anyBoolean()) val tempDir = Utils.createTempDir() try { client.prepareLocalResources(tempDir.getAbsolutePath()) - sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) + sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER)) // The non-local path should be propagated by name only, since it will end up in the app's // staging dir. val expected = ADDED.split(",") .map(p => { val uri = new URI(p) - if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { + if (Client.LOCAL_SCHEME == uri.getScheme()) { p } else { Option(uri.getFragment()).getOrElse(new File(p).getName()) @@ -132,7 +131,7 @@ class ClientBaseSuite extends FunSuite with Matchers { }) .mkString(",") - sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) + sparkConf.getOption(Client.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) } finally { Utils.deleteRecursively(tempDir) } @@ -141,34 +140,34 @@ class ClientBaseSuite extends FunSuite with Matchers { test("check access nns empty") { val sparkConf = new SparkConf() sparkConf.set("spark.yarn.access.namenodes", "") - val nns = ClientBase.getNameNodesToAccess(sparkConf) + val nns = Client.getNameNodesToAccess(sparkConf) nns should be(Set()) } test("check access nns unset") { val sparkConf = new SparkConf() - val nns = ClientBase.getNameNodesToAccess(sparkConf) + val nns = Client.getNameNodesToAccess(sparkConf) nns should be(Set()) } test("check access nns") { val sparkConf = new SparkConf() sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032") - val nns = ClientBase.getNameNodesToAccess(sparkConf) + val nns = Client.getNameNodesToAccess(sparkConf) nns should be(Set(new Path("hdfs://nn1:8032"))) } test("check access nns space") { val sparkConf = new SparkConf() sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ") - val nns = ClientBase.getNameNodesToAccess(sparkConf) + val nns = Client.getNameNodesToAccess(sparkConf) nns should be(Set(new Path("hdfs://nn1:8032"))) } test("check access two nns") { val sparkConf = new SparkConf() sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032") - val nns = ClientBase.getNameNodesToAccess(sparkConf) + val nns = Client.getNameNodesToAccess(sparkConf) nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032"))) } @@ -176,7 +175,7 @@ class ClientBaseSuite extends FunSuite with Matchers { val hadoopConf = new Configuration() hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM") - val renewer = ClientBase.getTokenRenewer(hadoopConf) + val renewer = Client.getTokenRenewer(hadoopConf) renewer should be ("yarn/myrm:8032@SPARKTEST.COM") } @@ -184,7 +183,7 @@ class ClientBaseSuite extends FunSuite with Matchers { val hadoopConf = new Configuration() val caught = intercept[SparkException] { - ClientBase.getTokenRenewer(hadoopConf) + Client.getTokenRenewer(hadoopConf) } assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") } @@ -218,7 +217,7 @@ class ClientBaseSuite extends FunSuite with Matchers { def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { val conf = new Configuration - m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") } + m.foreach { case (k, v) => conf.set(k, v, "ClientSpec") } testCode(conf) } @@ -242,15 +241,4 @@ class ClientBaseSuite extends FunSuite with Matchers { }.toOption.getOrElse(defaults) } - private class DummyClient( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf, - val yarnConf: YarnConfiguration) extends ClientBase { - override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? - override def submitApplication(): ApplicationId = ??? - override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? - override def getClientToken(report: ApplicationReport): String = ??? - } - } |