aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala108
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala10
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala3
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala153
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala3
5 files changed, 206 insertions, 71 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6ca9669002..0b5ceb768c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -423,7 +423,63 @@ private[spark] class Client(
}
/**
- * Copy the given main resource to the distributed cache if the scheme is not "local".
+ * Add Spark to the cache. There are two settings that control what files to add to the cache:
+ * - if a Spark archive is defined, use the archive. The archive is expected to contain
+ * jar files at its root directory.
+ * - if a list of jars is provided, filter the non-local ones, resolve globs, and
+ * add the found files to the cache.
+ *
+ * Note that the archive cannot be a "local" URI. If none of the above settings are found,
+ * then upload all files found in $SPARK_HOME/jars.
+ *
+ * TODO: currently the code looks in $SPARK_HOME/lib while the work to replace assemblies
+ * with a directory full of jars is ongoing.
+ */
+ val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
+ if (sparkArchive.isDefined) {
+ val archive = sparkArchive.get
+ require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
+ distribute(Utils.resolveURI(archive).toString,
+ resType = LocalResourceType.ARCHIVE,
+ destName = Some(LOCALIZED_LIB_DIR))
+ } else {
+ sparkConf.get(SPARK_JARS) match {
+ case Some(jars) =>
+ // Break the list of jars to upload, and resolve globs.
+ val localJars = new ArrayBuffer[String]()
+ jars.foreach { jar =>
+ if (!isLocalUri(jar)) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val pathFs = FileSystem.get(path.toUri(), hadoopConf)
+ pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
+ distribute(entry.getPath().toUri().toString(),
+ targetDir = Some(LOCALIZED_LIB_DIR))
+ }
+ } else {
+ localJars += jar
+ }
+ }
+
+ // Propagate the local URIs to the containers using the configuration.
+ sparkConf.set(SPARK_JARS, localJars)
+
+ case None =>
+ // No configuration, so fall back to uploading local jar files.
+ logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
+ "to uploading libraries under SPARK_HOME.")
+ val jarsDir = new File(sparkConf.getenv("SPARK_HOME"), "lib")
+ if (jarsDir.isDirectory()) {
+ jarsDir.listFiles().foreach { f =>
+ if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) {
+ distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR))
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Copy a few resources to the distributed cache if their scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
* (1) destination resource name,
@@ -431,8 +487,7 @@ private[spark] class Client(
* (3) Spark property key to set if the scheme is not local
*/
List(
- (SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key),
- (APP_JAR_NAME, args.userJar, APP_JAR.key),
+ (APP_JAR_NAME, args.userJar, APP_JAR),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, path, confKey) =>
if (path != null && !path.trim().isEmpty()) {
@@ -1062,8 +1117,7 @@ object Client extends Logging {
new Client(args, sparkConf).run()
}
- // Alias for the Spark assembly jar and the user jar
- val SPARK_JAR_NAME: String = "__spark__.jar"
+ // Alias for the user jar
val APP_JAR_NAME: String = "__app__.jar"
// URI scheme that identifies local resources
@@ -1072,8 +1126,6 @@ object Client extends Logging {
// Staging directory for any temporary jars or files
val SPARK_STAGING: String = ".sparkStaging"
- // Location of any user-defined Spark jars
- val ENV_SPARK_JAR = "SPARK_JAR"
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
@@ -1095,28 +1147,8 @@ object Client extends Logging {
// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"
- /**
- * Find the user-defined Spark jar if configured, or return the jar containing this
- * class if not.
- *
- * This method first looks in the SparkConf object for the spark.yarn.jar key, and in the
- * user environment if that is not found (for backwards compatibility).
- */
- private def sparkJar(conf: SparkConf): String = {
- conf.get(SPARK_JAR).getOrElse(
- if (System.getenv(ENV_SPARK_JAR) != null) {
- logWarning(
- s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " +
- s"in favor of the ${SPARK_JAR.key} configuration variable.")
- System.getenv(ENV_SPARK_JAR)
- } else {
- SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not "
- + "find jar containing Spark classes. The jar can be defined using the "
- + s"${SPARK_JAR.key} configuration option. If testing Spark, either set that option "
- + "or make sure SPARK_PREPEND_CLASSES is not set."))
- }
- )
- }
+ // Subdirectory where Spark libraries will be placed.
+ val LOCALIZED_LIB_DIR = "__spark_libs__"
/**
* Return the path to the given application's staging directory.
@@ -1236,7 +1268,18 @@ object Client extends Logging {
addFileToClasspath(sparkConf, conf, x, null, env)
}
}
- addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR_NAME, env)
+
+ // Add the Spark jars to the classpath, depending on how they were distributed.
+ addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+ LOCALIZED_LIB_DIR, "*"), env)
+ if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+ sparkConf.get(SPARK_JARS).foreach { jars =>
+ jars.filter(isLocalUri).foreach { jar =>
+ addClasspathEntry(getClusterPath(sparkConf, jar), env)
+ }
+ }
+ }
+
populateHadoopClasspath(conf, env)
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
@@ -1392,4 +1435,9 @@ object Client extends Logging {
components.mkString(Path.SEPARATOR)
}
+ /** Returns whether the URI is a "local:" URI. */
+ def isLocalUri(uri: String): Boolean = {
+ uri.startsWith(s"$LOCAL_SCHEME:")
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 06c1be9bf0..10cd6d00b0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -72,11 +72,17 @@ package object config {
/* File distribution. */
- private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar")
- .doc("Location of the Spark jar to use.")
+ private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive")
+ .doc("Location of archive containing jars files with Spark classes.")
.stringConf
.optional
+ private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars")
+ .doc("Location of jars containing Spark classes.")
+ .stringConf
+ .toSequence
+ .optional
+
private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives")
.stringConf
.optional
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 272e2454da..b12e506033 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -34,6 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.launcher._
import org.apache.spark.util.Utils
@@ -202,7 +203,7 @@ abstract class BaseYarnClusterSuite
extraClassPath: Seq[String] = Nil,
extraConf: Map[String, String] = Map()): String = {
val props = new Properties()
- props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+ props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath())
val testClasspath = new TestClasspathBuilder()
.buildClassPath(
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b57c179d89..24472e006b 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -36,17 +36,19 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
-import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => meq, _}
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.util.{ResetSystemProperties, Utils}
+import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils}
class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
with ResetSystemProperties {
+ import Client._
+
var oldSystemProperties: Properties = null
override def beforeAll(): Unit = {
@@ -65,35 +67,35 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("default Yarn application classpath") {
- Client.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+ getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
}
test("default MR application classpath") {
- Client.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+ getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
}
test("resultant classpath for an application that defines a classpath for YARN") {
withAppConf(Fixtures.mapYARNAppConf) { conf =>
val env = newEnv
- Client.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
classpath(env) should be(
- flatten(Fixtures.knownYARNAppCP, Client.getDefaultMRApplicationClasspath))
+ flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
}
}
test("resultant classpath for an application that defines a classpath for MR") {
withAppConf(Fixtures.mapMRAppConf) { conf =>
val env = newEnv
- Client.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
classpath(env) should be(
- flatten(Client.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+ flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
}
}
test("resultant classpath for an application that defines both classpaths, YARN and MR") {
withAppConf(Fixtures.mapAppConf) { conf =>
val env = newEnv
- Client.populateHadoopClasspath(conf, env)
+ populateHadoopClasspath(conf, env)
classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
}
}
@@ -102,47 +104,43 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private val USER = "local:/userJar"
private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
+ private val PWD =
+ if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+ "{{PWD}}"
+ } else if (Utils.isWindows) {
+ "%PWD%"
+ } else {
+ Environment.PWD.$()
+ }
+
test("Local jar URIs") {
val conf = new Configuration()
val sparkConf = new SparkConf()
- .set(SPARK_JAR, SPARK)
+ .set(SPARK_JARS, Seq(SPARK))
.set(USER_CLASS_PATH_FIRST, true)
val env = new MutableHashMap[String, String]()
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
- Client.populateClasspath(args, conf, sparkConf, env, true)
+ populateClasspath(args, conf, sparkConf, env, true)
val cp = env("CLASSPATH").split(":|;|<CPS>")
s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
val uri = new URI(entry)
- if (Client.LOCAL_SCHEME.equals(uri.getScheme())) {
+ if (LOCAL_SCHEME.equals(uri.getScheme())) {
cp should contain (uri.getPath())
} else {
cp should not contain (uri.getPath())
}
})
- val pwdVar =
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- "{{PWD}}"
- } else if (Utils.isWindows) {
- "%PWD%"
- } else {
- Environment.PWD.$()
- }
- cp should contain(pwdVar)
- cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}")
- cp should not contain (Client.SPARK_JAR_NAME)
- cp should not contain (Client.APP_JAR_NAME)
+ cp should contain(PWD)
+ cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}")
+ cp should not contain (APP_JAR)
}
test("Jar path propagation through SparkConf") {
- val conf = new Configuration()
- val sparkConf = new SparkConf().set(SPARK_JAR, SPARK)
- val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
-
- val client = spy(new Client(args, conf, sparkConf))
- doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort())
+ val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK))
+ val client = createClient(sparkConf,
+ args = Array("--jar", USER, "--addJars", ADDED))
val tempDir = Utils.createTempDir()
try {
@@ -154,7 +152,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val expected = ADDED.split(",")
.map(p => {
val uri = new URI(p)
- if (Client.LOCAL_SCHEME == uri.getScheme()) {
+ if (LOCAL_SCHEME == uri.getScheme()) {
p
} else {
Option(uri.getFragment()).getOrElse(new File(p).getName())
@@ -171,16 +169,16 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
test("Cluster path translation") {
val conf = new Configuration()
val sparkConf = new SparkConf()
- .set(SPARK_JAR.key, "local:/localPath/spark.jar")
+ .set(SPARK_JARS, Seq("local:/localPath/spark.jar"))
.set(GATEWAY_ROOT_PATH, "/localPath")
.set(REPLACEMENT_ROOT_PATH, "/remotePath")
- Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
- Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
+ getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
+ getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
"/remotePath/1:/remotePath/2")
val env = new MutableHashMap[String, String]()
- Client.populateClasspath(null, conf, sparkConf, env, false,
+ populateClasspath(null, conf, sparkConf, env, false,
extraClassPath = Some("/localPath/my1.jar"))
val cp = classpath(env)
cp should contain ("/remotePath/spark.jar")
@@ -220,6 +218,70 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
appContext.getMaxAppAttempts should be (42)
}
+ test("spark.yarn.jars with multiple paths and globs") {
+ val libs = Utils.createTempDir()
+ val single = Utils.createTempDir()
+ val jar1 = TestUtils.createJarWithFiles(Map(), libs)
+ val jar2 = TestUtils.createJarWithFiles(Map(), libs)
+ val jar3 = TestUtils.createJarWithFiles(Map(), single)
+ val jar4 = TestUtils.createJarWithFiles(Map(), single)
+
+ val jarsConf = Seq(
+ s"${libs.getAbsolutePath()}/*",
+ jar3.getPath(),
+ s"local:${jar4.getPath()}",
+ s"local:${single.getAbsolutePath()}/*")
+
+ val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf)
+ val client = createClient(sparkConf)
+
+ val tempDir = Utils.createTempDir()
+ client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+
+ assert(sparkConf.get(SPARK_JARS) ===
+ Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
+
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort())
+
+ val cp = classpath(client)
+ cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+ cp should not contain (jar3.getPath())
+ cp should contain (jar4.getPath())
+ cp should contain (buildPath(single.getAbsolutePath(), "*"))
+ }
+
+ test("distribute jars archive") {
+ val temp = Utils.createTempDir()
+ val archive = TestUtils.createJarWithFiles(Map(), temp)
+
+ val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath())
+ val client = createClient(sparkConf)
+ client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
+ classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+
+ sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
+ intercept[IllegalArgumentException] {
+ client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ }
+ }
+
+ test("distribute local spark jars") {
+ val temp = Utils.createTempDir()
+ val jarsDir = new File(temp, "lib")
+ assert(jarsDir.mkdir())
+ val jar = TestUtils.createJarWithFiles(Map(), jarsDir)
+
+ val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
+ val client = createClient(sparkConf)
+ client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
+ classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
@@ -280,4 +342,21 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}.toOption.getOrElse(defaults)
}
+ private def createClient(
+ sparkConf: SparkConf,
+ conf: Configuration = new Configuration(),
+ args: Array[String] = Array()): Client = {
+ val clientArgs = new ClientArguments(args, sparkConf)
+ val client = spy(new Client(clientArgs, conf, sparkConf))
+ doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
+ any(classOf[Path]), anyShort())
+ client
+ }
+
+ private def classpath(client: Client): Array[String] = {
+ val env = new MutableHashMap[String, String]()
+ populateClasspath(null, client.hadoopConf, client.sparkConf, env, false)
+ classpath(env)
+ }
+
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 1dd2f93bb7..0587444a33 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -29,6 +29,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
@@ -55,7 +56,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
val sparkConf = new SparkConf()
sparkConf.set("spark.driver.host", "localhost")
sparkConf.set("spark.driver.port", "4040")
- sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+ sparkConf.set(SPARK_JARS, Seq("notarealjar.jar"))
sparkConf.set("spark.yarn.launchContainers", "false")
val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)