aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-03-11 07:54:57 -0600
committerTom Graves <tgraves@yahoo-inc.com>2016-03-11 07:54:57 -0600
commit07f1c5447753a3d593cd6ececfcb03c11b1cf8ff (patch)
tree74c4c9f81e64cc1ddde0b1c5e554a836808609e1 /yarn/src/test/scala/org
parent8fff0f92a4aca90b62c6e272eabcbb0257ba38d5 (diff)
downloadspark-07f1c5447753a3d593cd6ececfcb03c11b1cf8ff.tar.gz
spark-07f1c5447753a3d593cd6ececfcb03c11b1cf8ff.tar.bz2
spark-07f1c5447753a3d593cd6ececfcb03c11b1cf8ff.zip
[SPARK-13577][YARN] Allow Spark jar to be multiple jars, archive.
In preparation for the demise of assemblies, this change allows the YARN backend to use multiple jars and globs as the "Spark jar". The config option has been renamed to "spark.yarn.jars" to reflect that. A second option "spark.yarn.archive" was also added; if set, this takes precedence and uploads an archive expected to contain the jar files with the Spark code and its dependencies. Existing deployments should keep working, mostly. This change drops support for the "SPARK_JAR" environment variable, and also does not fall back to using "jarOfClass" if no configuration is set, falling back to finding files under SPARK_HOME instead. This should be fine since "jarOfClass" probably wouldn't work unless you were using spark-submit anyway. Tested with the unit tests, and trying the different config options on a YARN cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #11500 from vanzin/SPARK-13577.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala153
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala3
3 files changed, 120 insertions, 39 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 272e2454da..b12e506033 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -34,6 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.launcher._
import org.apache.spark.util.Utils
@@ -202,7 +203,7 @@ abstract class BaseYarnClusterSuite
extraClassPath: Seq[String] = Nil,
extraConf: Map[String, String] = Map()): String = {
val props = new Properties()
- props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+ props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath())
val testClasspath = new TestClasspathBuilder()
.buildClassPath(
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b57c179d89..24472e006b 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -36,17 +36,19 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
-import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => meq, _}
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.util.{ResetSystemProperties, Utils}
+import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
with ResetSystemProperties {
+ import Client._
+
var oldSystemProperties: Properties = null
override def beforeAll(): Unit = {
@@ -65,35 +67,35 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("default Yarn application classpath") {
- Client.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+ getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
}
test("default MR application classpath") {
- Client.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+ getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
}
test("resultant classpath for an application that defines a classpath for YARN") {
withAppConf(Fixtures.mapYARNAppConf) { conf =>
val env = newEnv
- Client.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
classpath(env) should be(
- flatten(Fixtures.knownYARNAppCP, Client.getDefaultMRApplicationClasspath))
+ flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
}
}
test("resultant classpath for an application that defines a classpath for MR") {
withAppConf(Fixtures.mapMRAppConf) { conf =>
val env = newEnv
- Client.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
classpath(env) should be(
- flatten(Client.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+ flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
}
}
test("resultant classpath for an application that defines both classpaths, YARN and MR") {
withAppConf(Fixtures.mapAppConf) { conf =>
val env = newEnv
- Client.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
}
}
@@ -102,47 +104,43 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private val USER = "local:/userJar"
private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
+ private val PWD =
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ "{{PWD}}"
+ } else if (Utils.isWindows) {
+ "%PWD%"
+ } else {
+ Environment.PWD.$()
+ }
+
test("Local jar URIs") {
val conf = new Configuration()
val sparkConf = new SparkConf()
- .set(SPARK_JAR, SPARK)
+ .set(SPARK_JARS, Seq(SPARK))
.set(USER_CLASS_PATH_FIRST, true)
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- Client.populateClasspath(args, conf, sparkConf, env, true)
+ populateClasspath(args, conf, sparkConf, env, true)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
val uri = new URI(entry)
- if (Client.LOCAL_SCHEME.equals(uri.getScheme())) {
+ if (LOCAL_SCHEME.equals(uri.getScheme())) {
cp should contain (uri.getPath())
} else {
cp should not contain (uri.getPath())
}
})
- val pwdVar =
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- "{{PWD}}"
- } else if (Utils.isWindows) {
- "%PWD%"
- } else {
- Environment.PWD.$()
- }
- cp should contain(pwdVar)
- cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
- cp should not contain (Client.SPARK_JAR_NAME)
- cp should not contain (Client.APP_JAR_NAME)
+ cp should contain(PWD)
+ cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}")
+ cp should not contain (APP_JAR)
}
test("Jar path propagation through SparkConf") {
- val conf = new Configuration()
- val sparkConf = new SparkConf().set(SPARK_JAR, SPARK)
- val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
-
- val client = spy(new Client(args, conf, sparkConf))
- doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort())
+ val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
+ val client = createClient(sparkConf,
+ args = Array("--jar", USER, "--addJars", ADDED))
val tempDir = Utils.createTempDir()
try {
@@ -154,7 +152,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val expected = ADDED.split(",")
.map(p => {
val uri = new URI(p)
- if (Client.LOCAL_SCHEME == uri.getScheme()) {
+ if (LOCAL_SCHEME == uri.getScheme()) {
p
} else {
Option(uri.getFragment()).getOrElse(new File(p).getName())
@@ -171,16 +169,16 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
test("Cluster path translation") {
val conf = new Configuration()
val sparkConf = new SparkConf()
- .set(SPARK_JAR.key, "local:/localPath/spark.jar")
+ .set(SPARK_JARS, Seq("local:/localPath/spark.jar"))
.set(GATEWAY_ROOT_PATH, "/localPath")
.set(REPLACEMENT_ROOT_PATH, "/remotePath")
- Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
- Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
+ getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
+ getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
"/remotePath/1:/remotePath/2")
val env = new MutableHashMap[String, String]()
- Client.populateClasspath(null, conf, sparkConf, env, false,
+ populateClasspath(null, conf, sparkConf, env, false,
extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
@@ -220,6 +218,70 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
appContext.getMaxAppAttempts should be (42)
}
+ test("spark.yarn.jars with multiple paths and globs") {
+ val libs = Utils.createTempDir()
+ val single = Utils.createTempDir()
+ val jar1 = TestUtils.createJarWithFiles(Map(), libs)
+ val jar2 = TestUtils.createJarWithFiles(Map(), libs)
+ val jar3 = TestUtils.createJarWithFiles(Map(), single)
+ val jar4 = TestUtils.createJarWithFiles(Map(), single)
+
+ val jarsConf = Seq(
+ s"${libs.getAbsolutePath()}/*",
+ jar3.getPath(),
+ s"local:${jar4.getPath()}",
+ s"local:${single.getAbsolutePath()}/*")
+
+ val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf)
+ val client = createClient(sparkConf)
+
+ val tempDir = Utils.createTempDir()
+ client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+
+ assert(sparkConf.get(SPARK_JARS) ===
+ Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
+
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort())
+
+ val cp = classpath(client)
+ cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+ cp should not contain (jar3.getPath())
+ cp should contain (jar4.getPath())
+ cp should contain (buildPath(single.getAbsolutePath(), "*"))
+ }
+
+ test("distribute jars archive") {
+ val temp = Utils.createTempDir()
+ val archive = TestUtils.createJarWithFiles(Map(), temp)
+
+ val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath())
+ val client = createClient(sparkConf)
+ client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
+ classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+
+ sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
+ intercept[IllegalArgumentException] {
+ client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ }
+ }
+
+ test("distribute local spark jars") {
+ val temp = Utils.createTempDir()
+ val jarsDir = new File(temp, "lib")
+ assert(jarsDir.mkdir())
+ val jar = TestUtils.createJarWithFiles(Map(), jarsDir)
+
+ val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
+ val client = createClient(sparkConf)
+ client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
+ classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
@@ -280,4 +342,21 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}.toOption.getOrElse(defaults)
}
+ private def createClient(
+ sparkConf: SparkConf,
+ conf: Configuration = new Configuration(),
+ args: Array[String] = Array()): Client = {
+ val clientArgs = new ClientArguments(args, sparkConf)
+ val client = spy(new Client(clientArgs, conf, sparkConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort())
+ client
+ }
+
+ private def classpath(client: Client): Array[String] = {
+ val env = new MutableHashMap[String, String]()
+ populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+ classpath(env)
+ }
+
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 1dd2f93bb7..0587444a33 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -29,6 +29,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
@@ -55,7 +56,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val sparkConf = new SparkConf()
sparkConf.set("spark.driver.host", "localhost")
sparkConf.set("spark.driver.port", "4040")
- sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+ sparkConf.set(SPARK_JARS, Seq("notarealjar.jar"))
sparkConf.set("spark.yarn.launchContainers", "false")
val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)