diff options
Diffstat (limited to 'yarn/src/test')
3 files changed, 120 insertions, 39 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 272e2454da..b12e506033 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -34,6 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.launcher._ import org.apache.spark.util.Utils @@ -202,7 +203,7 @@ abstract class BaseYarnClusterSuite extraClassPath: Seq[String] = Nil, extraConf: Map[String, String] = Map()): String = { val props = new Properties() - props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath()) + props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath()) val testClasspath = new TestClasspathBuilder() .buildClassPath( diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index b57c179d89..24472e006b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -36,17 +36,19 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.YarnClientApplication import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records -import org.mockito.Matchers._ +import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, Matchers} -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.util.{ResetSystemProperties, Utils} +import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll with ResetSystemProperties { + import Client._ + var oldSystemProperties: Properties = null override def beforeAll(): Unit = { @@ -65,35 +67,35 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("default Yarn application classpath") { - Client.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) } test("default MR application classpath") { - Client.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) } test("resultant classpath for an application that defines a classpath for YARN") { withAppConf(Fixtures.mapYARNAppConf) { conf => val env = newEnv - Client.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, Client.getDefaultMRApplicationClasspath)) + flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath)) } } test("resultant classpath for an application that defines a classpath for MR") { withAppConf(Fixtures.mapMRAppConf) { conf => val env = newEnv - Client.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) classpath(env) should be( - flatten(Client.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) } } test("resultant classpath for an application that defines both classpaths, YARN and MR") { withAppConf(Fixtures.mapAppConf) { conf => val env = newEnv - Client.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) } } @@ -102,47 +104,43 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private val USER = "local:/userJar" private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" + private val PWD = + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + "{{PWD}}" + } else if (Utils.isWindows) { + "%PWD%" + } else { + Environment.PWD.$() + } + test("Local jar URIs") { val conf = new Configuration() val sparkConf = new SparkConf() - .set(SPARK_JAR, SPARK) + .set(SPARK_JARS, Seq(SPARK)) .set(USER_CLASS_PATH_FIRST, true) val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - Client.populateClasspath(args, conf, sparkConf, env, true) + populateClasspath(args, conf, sparkConf, env, true) val cp = env("CLASSPATH").split(":|;|<CPS>") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) - if (Client.LOCAL_SCHEME.equals(uri.getScheme())) { + if (LOCAL_SCHEME.equals(uri.getScheme())) { cp should contain (uri.getPath()) } else { cp should not contain (uri.getPath()) } }) - val pwdVar = - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - "{{PWD}}" - } else if (Utils.isWindows) { - "%PWD%" - } else { - Environment.PWD.$() - } - cp should contain(pwdVar) - cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}") - cp should not contain (Client.SPARK_JAR_NAME) - cp should not contain (Client.APP_JAR_NAME) + cp should contain(PWD) + cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}") + cp should not contain (APP_JAR) } test("Jar path propagation through SparkConf") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(SPARK_JAR, SPARK) - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - - val client = spy(new Client(args, conf, sparkConf)) - doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort()) + val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK)) + val client = createClient(sparkConf, + args = Array("--jar", USER, "--addJars", ADDED)) val tempDir = Utils.createTempDir() try { @@ -154,7 +152,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val expected = ADDED.split(",") .map(p => { val uri = new URI(p) - if (Client.LOCAL_SCHEME == uri.getScheme()) { + if (LOCAL_SCHEME == uri.getScheme()) { p } else { Option(uri.getFragment()).getOrElse(new File(p).getName()) @@ -171,16 +169,16 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll test("Cluster path translation") { val conf = new Configuration() val sparkConf = new SparkConf() - .set(SPARK_JAR.key, "local:/localPath/spark.jar") + .set(SPARK_JARS, Seq("local:/localPath/spark.jar")) .set(GATEWAY_ROOT_PATH, "/localPath") .set(REPLACEMENT_ROOT_PATH, "/remotePath") - Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath") - Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be ( + getClusterPath(sparkConf, "/localPath") should be ("/remotePath") + getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be ( "/remotePath/1:/remotePath/2") val env = new MutableHashMap[String, String]() - Client.populateClasspath(null, conf, sparkConf, env, false, + populateClasspath(null, conf, sparkConf, env, false, extraClassPath = Some("/localPath/my1.jar")) val cp = classpath(env) cp should contain ("/remotePath/spark.jar") @@ -220,6 +218,70 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll appContext.getMaxAppAttempts should be (42) } + test("spark.yarn.jars with multiple paths and globs") { + val libs = Utils.createTempDir() + val single = Utils.createTempDir() + val jar1 = TestUtils.createJarWithFiles(Map(), libs) + val jar2 = TestUtils.createJarWithFiles(Map(), libs) + val jar3 = TestUtils.createJarWithFiles(Map(), single) + val jar4 = TestUtils.createJarWithFiles(Map(), single) + + val jarsConf = Seq( + s"${libs.getAbsolutePath()}/*", + jar3.getPath(), + s"local:${jar4.getPath()}", + s"local:${single.getAbsolutePath()}/*") + + val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf) + val client = createClient(sparkConf) + + val tempDir = Utils.createTempDir() + client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + + assert(sparkConf.get(SPARK_JARS) === + Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) + + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort()) + + val cp = classpath(client) + cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + cp should not contain (jar3.getPath()) + cp should contain (jar4.getPath()) + cp should contain (buildPath(single.getAbsolutePath(), "*")) + } + + test("distribute jars archive") { + val temp = Utils.createTempDir() + val archive = TestUtils.createJarWithFiles(Map(), temp) + + val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath()) + val client = createClient(sparkConf) + client.prepareLocalResources(temp.getAbsolutePath(), Nil) + + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort()) + classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + + sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) + intercept[IllegalArgumentException] { + client.prepareLocalResources(temp.getAbsolutePath(), Nil) + } + } + + test("distribute local spark jars") { + val temp = Utils.createTempDir() + val jarsDir = new File(temp, "lib") + assert(jarsDir.mkdir()) + val jar = TestUtils.createJarWithFiles(Map(), jarsDir) + + val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) + val client = createClient(sparkConf) + client.prepareLocalResources(temp.getAbsolutePath(), Nil) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort()) + classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = @@ -280,4 +342,21 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll }.toOption.getOrElse(defaults) } + private def createClient( + sparkConf: SparkConf, + conf: Configuration = new Configuration(), + args: Array[String] = Array()): Client = { + val clientArgs = new ClientArguments(args, sparkConf) + val client = spy(new Client(clientArgs, conf, sparkConf)) + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), + any(classOf[Path]), anyShort()) + client + } + + private def classpath(client: Client): Array[String] = { + val env = new MutableHashMap[String, String]() + populateClasspath(null, client.hadoopConf, client.sparkConf, env, false) + classpath(env) + } + } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 1dd2f93bb7..0587444a33 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -29,6 +29,7 @@ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.YarnAllocator._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.RpcEndpointRef @@ -55,7 +56,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val sparkConf = new SparkConf() sparkConf.set("spark.driver.host", "localhost") sparkConf.set("spark.driver.port", "4040") - sparkConf.set("spark.yarn.jar", "notarealjar.jar") + sparkConf.set(SPARK_JARS, Seq("notarealjar.jar")) sparkConf.set("spark.yarn.launchContainers", "false") val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0) |