aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-01-08 09:25:43 -0800
committerAndrew Or <andrew@databricks.com>2015-01-08 09:25:43 -0800
commit8d45834debc6986e61831d0d6e982d5528dccc51 (patch)
tree4233ccff0dab1e94e2b20c953d23669426ae6ecd /yarn/src/test
parentc08238570c4aa53b0bed24d3304ce104e9bd65ce (diff)
downloadspark-8d45834debc6986e61831d0d6e982d5528dccc51.tar.gz
spark-8d45834debc6986e61831d0d6e982d5528dccc51.tar.bz2
spark-8d45834debc6986e61831d0d6e982d5528dccc51.zip
SPARK-5087. [YARN] Merge yarn.Client and yarn.ClientBase
Author: Sandy Ryza <sandy@cloudera.com> Closes #3896 from sryza/sandy-spark-5087 and squashes the following commits: 65611d0 [Sandy Ryza] Review feedback 3294176 [Sandy Ryza] SPARK-5087. [YARN] Merge yarn.Client and yarn.ClientBase
Diffstat (limited to 'yarn/src/test')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala (renamed from yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala)64
1 files changed, 26 insertions, 38 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 17b79ae1d8..aad50015b7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -41,38 +41,38 @@ import scala.util.Try
import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.util.Utils
-class ClientBaseSuite extends FunSuite with Matchers {
+class ClientSuite extends FunSuite with Matchers {
test("default Yarn application classpath") {
- ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+ Client.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
}
test("default MR application classpath") {
- ClientBase.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+ Client.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
}
test("resultant classpath for an application that defines a classpath for YARN") {
withAppConf(Fixtures.mapYARNAppConf) { conf =>
val env = newEnv
- ClientBase.populateHadoopClasspath(conf, env)
+ Client.populateHadoopClasspath(conf, env)
classpath(env) should be(
- flatten(Fixtures.knownYARNAppCP, ClientBase.getDefaultMRApplicationClasspath))
+ flatten(Fixtures.knownYARNAppCP, Client.getDefaultMRApplicationClasspath))
}
}
test("resultant classpath for an application that defines a classpath for MR") {
withAppConf(Fixtures.mapMRAppConf) { conf =>
val env = newEnv
- ClientBase.populateHadoopClasspath(conf, env)
+ Client.populateHadoopClasspath(conf, env)
classpath(env) should be(
- flatten(ClientBase.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+ flatten(Client.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
}
}
test("resultant classpath for an application that defines both classpaths, YARN and MR") {
withAppConf(Fixtures.mapAppConf) { conf =>
val env = newEnv
- ClientBase.populateHadoopClasspath(conf, env)
+ Client.populateHadoopClasspath(conf, env)
classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
}
}
@@ -83,16 +83,16 @@ class ClientBaseSuite extends FunSuite with Matchers {
test("Local jar URIs") {
val conf = new Configuration()
- val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
+ val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- ClientBase.populateClasspath(args, conf, sparkConf, env)
+ Client.populateClasspath(args, conf, sparkConf, env)
val cp = env("CLASSPATH").split(File.pathSeparator)
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
val uri = new URI(entry)
- if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
+ if (Client.LOCAL_SCHEME.equals(uri.getScheme())) {
cp should contain (uri.getPath())
} else {
cp should not contain (uri.getPath())
@@ -100,31 +100,30 @@ class ClientBaseSuite extends FunSuite with Matchers {
})
cp should contain (Environment.PWD.$())
cp should contain (s"${Environment.PWD.$()}${File.separator}*")
- cp should not contain (ClientBase.SPARK_JAR)
- cp should not contain (ClientBase.APP_JAR)
+ cp should not contain (Client.SPARK_JAR)
+ cp should not contain (Client.APP_JAR)
}
test("Jar path propagation through SparkConf") {
val conf = new Configuration()
- val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
- val yarnConf = new YarnConfiguration()
+ val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK)
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
+ val client = spy(new Client(args, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
any(classOf[Path]), anyShort(), anyBoolean())
val tempDir = Utils.createTempDir()
try {
client.prepareLocalResources(tempDir.getAbsolutePath())
- sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
+ sparkConf.getOption(Client.CONF_SPARK_USER_JAR) should be (Some(USER))
// The non-local path should be propagated by name only, since it will end up in the app's
// staging dir.
val expected = ADDED.split(",")
.map(p => {
val uri = new URI(p)
- if (ClientBase.LOCAL_SCHEME == uri.getScheme()) {
+ if (Client.LOCAL_SCHEME == uri.getScheme()) {
p
} else {
Option(uri.getFragment()).getOrElse(new File(p).getName())
@@ -132,7 +131,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
})
.mkString(",")
- sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
+ sparkConf.getOption(Client.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected))
} finally {
Utils.deleteRecursively(tempDir)
}
@@ -141,34 +140,34 @@ class ClientBaseSuite extends FunSuite with Matchers {
test("check access nns empty") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "")
- val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ val nns = Client.getNameNodesToAccess(sparkConf)
nns should be(Set())
}
test("check access nns unset") {
val sparkConf = new SparkConf()
- val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ val nns = Client.getNameNodesToAccess(sparkConf)
nns should be(Set())
}
test("check access nns") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
- val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ val nns = Client.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032")))
}
test("check access nns space") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
- val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ val nns = Client.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032")))
}
test("check access two nns") {
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032,hdfs://nn2:8032")
- val nns = ClientBase.getNameNodesToAccess(sparkConf)
+ val nns = Client.getNameNodesToAccess(sparkConf)
nns should be(Set(new Path("hdfs://nn1:8032"), new Path("hdfs://nn2:8032")))
}
@@ -176,7 +175,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = ClientBase.getTokenRenewer(hadoopConf)
+ val renewer = Client.getTokenRenewer(hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
}
@@ -184,7 +183,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
val hadoopConf = new Configuration()
val caught =
intercept[SparkException] {
- ClientBase.getTokenRenewer(hadoopConf)
+ Client.getTokenRenewer(hadoopConf)
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}
@@ -218,7 +217,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) {
val conf = new Configuration
- m.foreach { case (k, v) => conf.set(k, v, "ClientBaseSpec") }
+ m.foreach { case (k, v) => conf.set(k, v, "ClientSpec") }
testCode(conf)
}
@@ -242,15 +241,4 @@ class ClientBaseSuite extends FunSuite with Matchers {
}.toOption.getOrElse(defaults)
}
- private class DummyClient(
- val args: ClientArguments,
- val hadoopConf: Configuration,
- val sparkConf: SparkConf,
- val yarnConf: YarnConfiguration) extends ClientBase {
- override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ???
- override def submitApplication(): ApplicationId = ???
- override def getApplicationReport(appId: ApplicationId): ApplicationReport = ???
- override def getClientToken(report: ApplicationReport): String = ???
- }
-
}