diff options
8 files changed, 227 insertions, 80 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index aaccf49eeb..ff8c631585 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -656,7 +656,9 @@ private[spark] object SparkConf extends Logging { "spark.memory.offHeap.enabled" -> Seq( AlternateConfig("spark.unsafe.offHeap", "1.6")), "spark.rpc.message.maxSize" -> Seq( - AlternateConfig("spark.akka.frameSize", "1.6")) + AlternateConfig("spark.akka.frameSize", "1.6")), + "spark.yarn.jars" -> Seq( + AlternateConfig("spark.yarn.jar", "2.0")) ) /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e8d0c3f9c3..4049fc0c41 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -626,7 +626,6 @@ object SparkSubmit { val pathConfigs = Seq( "spark.jars", "spark.files", - "spark.yarn.jar", "spark.yarn.dist.files", "spark.yarn.dist.archives") pathConfigs.foreach { config => diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index ad66b9f64a..8045f8c5b8 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -272,14 +272,25 @@ If you need a reference to the proper location to put log files in the YARN so t </td> </tr> <tr> - <td><code>spark.yarn.jar</code></td> + <td><code>spark.yarn.jars</code></td> <td>(none)</td> <td> - The location of the Spark jar file, in case overriding the default location is desired. - By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be + List of libraries containing Spark code to distribute to YARN containers. + By default, Spark on YARN will use Spark jars installed locally, but the Spark jars can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't - need to be distributed each time an application runs. To point to a jar on HDFS, for example, - set this configuration to <code>hdfs:///some/path</code>. + need to be distributed each time an application runs. To point to jars on HDFS, for example, + set this configuration to <code>hdfs:///some/path</code>. Globs are allowed. + </td> +</tr> +<tr> + <td><code>spark.yarn.archive</code></td> + <td>(none)</td> + <td> + An archive containing needed Spark jars for distribution to the YARN cache. If set, this + configuration replaces <code>spark.yarn.jars</code> and the archive is used in all the + application's containers. The archive should contain jar files in its root directory. + Like with the previous option, the archive can also be hosted on HDFS to speed up file + distribution. </td> </tr> <tr> @@ -288,8 +299,8 @@ If you need a reference to the proper location to put log files in the YARN so t <td> A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, <code>spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032, - webhdfs://nn3.com:50070</code>. The Spark application must have access to the namenodes listed - and Kerberos must be properly configured to be able to access them (either in the same realm + webhdfs://nn3.com:50070</code>. The Spark application must have access to the namenodes listed + and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters. </td> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6ca9669002..0b5ceb768c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -423,7 +423,63 @@ private[spark] class Client( } /** - * Copy the given main resource to the distributed cache if the scheme is not "local". + * Add Spark to the cache. There are two settings that control what files to add to the cache: + * - if a Spark archive is defined, use the archive. The archive is expected to contain + * jar files at its root directory. + * - if a list of jars is provided, filter the non-local ones, resolve globs, and + * add the found files to the cache. + * + * Note that the archive cannot be a "local" URI. If none of the above settings are found, + * then upload all files found in $SPARK_HOME/jars. + * + * TODO: currently the code looks in $SPARK_HOME/lib while the work to replace assemblies + * with a directory full of jars is ongoing. + */ + val sparkArchive = sparkConf.get(SPARK_ARCHIVE) + if (sparkArchive.isDefined) { + val archive = sparkArchive.get + require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.") + distribute(Utils.resolveURI(archive).toString, + resType = LocalResourceType.ARCHIVE, + destName = Some(LOCALIZED_LIB_DIR)) + } else { + sparkConf.get(SPARK_JARS) match { + case Some(jars) => + // Break the list of jars to upload, and resolve globs. + val localJars = new ArrayBuffer[String]() + jars.foreach { jar => + if (!isLocalUri(jar)) { + val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf) + val pathFs = FileSystem.get(path.toUri(), hadoopConf) + pathFs.globStatus(path).filter(_.isFile()).foreach { entry => + distribute(entry.getPath().toUri().toString(), + targetDir = Some(LOCALIZED_LIB_DIR)) + } + } else { + localJars += jar + } + } + + // Propagate the local URIs to the containers using the configuration. + sparkConf.set(SPARK_JARS, localJars) + + case None => + // No configuration, so fall back to uploading local jar files. + logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " + + "to uploading libraries under SPARK_HOME.") + val jarsDir = new File(sparkConf.getenv("SPARK_HOME"), "lib") + if (jarsDir.isDirectory()) { + jarsDir.listFiles().foreach { f => + if (f.isFile() && f.getName().toLowerCase().endsWith(".jar")) { + distribute(f.getAbsolutePath(), targetDir = Some(LOCALIZED_LIB_DIR)) + } + } + } + } + } + + /** + * Copy a few resources to the distributed cache if their scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. * Each resource is represented by a 3-tuple of: * (1) destination resource name, @@ -431,8 +487,7 @@ private[spark] class Client( * (3) Spark property key to set if the scheme is not local */ List( - (SPARK_JAR_NAME, sparkJar(sparkConf), SPARK_JAR.key), - (APP_JAR_NAME, args.userJar, APP_JAR.key), + (APP_JAR_NAME, args.userJar, APP_JAR), ("log4j.properties", oldLog4jConf.orNull, null) ).foreach { case (destName, path, confKey) => if (path != null && !path.trim().isEmpty()) { @@ -1062,8 +1117,7 @@ object Client extends Logging { new Client(args, sparkConf).run() } - // Alias for the Spark assembly jar and the user jar - val SPARK_JAR_NAME: String = "__spark__.jar" + // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" // URI scheme that identifies local resources @@ -1072,8 +1126,6 @@ object Client extends Logging { // Staging directory for any temporary jars or files val SPARK_STAGING: String = ".sparkStaging" - // Location of any user-defined Spark jars - val ENV_SPARK_JAR = "SPARK_JAR" // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = @@ -1095,28 +1147,8 @@ object Client extends Logging { // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__" - /** - * Find the user-defined Spark jar if configured, or return the jar containing this - * class if not. - * - * This method first looks in the SparkConf object for the spark.yarn.jar key, and in the - * user environment if that is not found (for backwards compatibility). - */ - private def sparkJar(conf: SparkConf): String = { - conf.get(SPARK_JAR).getOrElse( - if (System.getenv(ENV_SPARK_JAR) != null) { - logWarning( - s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " + - s"in favor of the ${SPARK_JAR.key} configuration variable.") - System.getenv(ENV_SPARK_JAR) - } else { - SparkContext.jarOfClass(this.getClass).getOrElse(throw new SparkException("Could not " - + "find jar containing Spark classes. The jar can be defined using the " - + s"${SPARK_JAR.key} configuration option. If testing Spark, either set that option " - + "or make sure SPARK_PREPEND_CLASSES is not set.")) - } - ) - } + // Subdirectory where Spark libraries will be placed. + val LOCALIZED_LIB_DIR = "__spark_libs__" /** * Return the path to the given application's staging directory. @@ -1236,7 +1268,18 @@ object Client extends Logging { addFileToClasspath(sparkConf, conf, x, null, env) } } - addFileToClasspath(sparkConf, conf, new URI(sparkJar(sparkConf)), SPARK_JAR_NAME, env) + + // Add the Spark jars to the classpath, depending on how they were distributed. + addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), + LOCALIZED_LIB_DIR, "*"), env) + if (!sparkConf.get(SPARK_ARCHIVE).isDefined) { + sparkConf.get(SPARK_JARS).foreach { jars => + jars.filter(isLocalUri).foreach { jar => + addClasspathEntry(getClusterPath(sparkConf, jar), env) + } + } + } + populateHadoopClasspath(conf, env) sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) @@ -1392,4 +1435,9 @@ object Client extends Logging { components.mkString(Path.SEPARATOR) } + /** Returns whether the URI is a "local:" URI. */ + def isLocalUri(uri: String): Boolean = { + uri.startsWith(s"$LOCAL_SCHEME:") + } + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 06c1be9bf0..10cd6d00b0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -72,11 +72,17 @@ package object config { /* File distribution. */ - private[spark] val SPARK_JAR = ConfigBuilder("spark.yarn.jar") - .doc("Location of the Spark jar to use.") + private[spark] val SPARK_ARCHIVE = ConfigBuilder("spark.yarn.archive") + .doc("Location of archive containing jars files with Spark classes.") .stringConf .optional + private[spark] val SPARK_JARS = ConfigBuilder("spark.yarn.jars") + .doc("Location of jars containing Spark classes.") + .stringConf + .toSequence + .optional + private[spark] val ARCHIVES_TO_DISTRIBUTE = ConfigBuilder("spark.yarn.dist.archives") .stringConf .optional diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 272e2454da..b12e506033 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -34,6 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.launcher._ import org.apache.spark.util.Utils @@ -202,7 +203,7 @@ abstract class BaseYarnClusterSuite extraClassPath: Seq[String] = Nil, extraConf: Map[String, String] = Map()): String = { val props = new Properties() - props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath()) + props.put(SPARK_JARS.key, "local:" + fakeSparkJar.getAbsolutePath()) val testClasspath = new TestClasspathBuilder() .buildClassPath( diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index b57c179d89..24472e006b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -36,17 +36,19 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.YarnClientApplication import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records -import org.mockito.Matchers._ +import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, Matchers} -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.util.{ResetSystemProperties, Utils} +import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll with ResetSystemProperties { + import Client._ + var oldSystemProperties: Properties = null override def beforeAll(): Unit = { @@ -65,35 +67,35 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("default Yarn application classpath") { - Client.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) + getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) } test("default MR application classpath") { - Client.getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) + getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) } test("resultant classpath for an application that defines a classpath for YARN") { withAppConf(Fixtures.mapYARNAppConf) { conf => val env = newEnv - Client.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, Client.getDefaultMRApplicationClasspath)) + flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath)) } } test("resultant classpath for an application that defines a classpath for MR") { withAppConf(Fixtures.mapMRAppConf) { conf => val env = newEnv - Client.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) classpath(env) should be( - flatten(Client.getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) + flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) } } test("resultant classpath for an application that defines both classpaths, YARN and MR") { withAppConf(Fixtures.mapAppConf) { conf => val env = newEnv - Client.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) } } @@ -102,47 +104,43 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private val USER = "local:/userJar" private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" + private val PWD = + if (classOf[Environment].getMethods().exists(_.getName == "$$")) { + "{{PWD}}" + } else if (Utils.isWindows) { + "%PWD%" + } else { + Environment.PWD.$() + } + test("Local jar URIs") { val conf = new Configuration() val sparkConf = new SparkConf() - .set(SPARK_JAR, SPARK) + .set(SPARK_JARS, Seq(SPARK)) .set(USER_CLASS_PATH_FIRST, true) val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - Client.populateClasspath(args, conf, sparkConf, env, true) + populateClasspath(args, conf, sparkConf, env, true) val cp = env("CLASSPATH").split(":|;|<CPS>") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => val uri = new URI(entry) - if (Client.LOCAL_SCHEME.equals(uri.getScheme())) { + if (LOCAL_SCHEME.equals(uri.getScheme())) { cp should contain (uri.getPath()) } else { cp should not contain (uri.getPath()) } }) - val pwdVar = - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - "{{PWD}}" - } else if (Utils.isWindows) { - "%PWD%" - } else { - Environment.PWD.$() - } - cp should contain(pwdVar) - cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_CONF_DIR}") - cp should not contain (Client.SPARK_JAR_NAME) - cp should not contain (Client.APP_JAR_NAME) + cp should contain(PWD) + cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}") + cp should not contain (APP_JAR) } test("Jar path propagation through SparkConf") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(SPARK_JAR, SPARK) - val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - - val client = spy(new Client(args, conf, sparkConf)) - doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort()) + val sparkConf = new SparkConf().set(SPARK_JARS, Seq(SPARK)) + val client = createClient(sparkConf, + args = Array("--jar", USER, "--addJars", ADDED)) val tempDir = Utils.createTempDir() try { @@ -154,7 +152,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val expected = ADDED.split(",") .map(p => { val uri = new URI(p) - if (Client.LOCAL_SCHEME == uri.getScheme()) { + if (LOCAL_SCHEME == uri.getScheme()) { p } else { Option(uri.getFragment()).getOrElse(new File(p).getName()) @@ -171,16 +169,16 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll test("Cluster path translation") { val conf = new Configuration() val sparkConf = new SparkConf() - .set(SPARK_JAR.key, "local:/localPath/spark.jar") + .set(SPARK_JARS, Seq("local:/localPath/spark.jar")) .set(GATEWAY_ROOT_PATH, "/localPath") .set(REPLACEMENT_ROOT_PATH, "/remotePath") - Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath") - Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be ( + getClusterPath(sparkConf, "/localPath") should be ("/remotePath") + getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be ( "/remotePath/1:/remotePath/2") val env = new MutableHashMap[String, String]() - Client.populateClasspath(null, conf, sparkConf, env, false, + populateClasspath(null, conf, sparkConf, env, false, extraClassPath = Some("/localPath/my1.jar")) val cp = classpath(env) cp should contain ("/remotePath/spark.jar") @@ -220,6 +218,70 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll appContext.getMaxAppAttempts should be (42) } + test("spark.yarn.jars with multiple paths and globs") { + val libs = Utils.createTempDir() + val single = Utils.createTempDir() + val jar1 = TestUtils.createJarWithFiles(Map(), libs) + val jar2 = TestUtils.createJarWithFiles(Map(), libs) + val jar3 = TestUtils.createJarWithFiles(Map(), single) + val jar4 = TestUtils.createJarWithFiles(Map(), single) + + val jarsConf = Seq( + s"${libs.getAbsolutePath()}/*", + jar3.getPath(), + s"local:${jar4.getPath()}", + s"local:${single.getAbsolutePath()}/*") + + val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf) + val client = createClient(sparkConf) + + val tempDir = Utils.createTempDir() + client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + + assert(sparkConf.get(SPARK_JARS) === + Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) + + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort()) + + val cp = classpath(client) + cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + cp should not contain (jar3.getPath()) + cp should contain (jar4.getPath()) + cp should contain (buildPath(single.getAbsolutePath(), "*")) + } + + test("distribute jars archive") { + val temp = Utils.createTempDir() + val archive = TestUtils.createJarWithFiles(Map(), temp) + + val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath()) + val client = createClient(sparkConf) + client.prepareLocalResources(temp.getAbsolutePath(), Nil) + + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort()) + classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + + sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) + intercept[IllegalArgumentException] { + client.prepareLocalResources(temp.getAbsolutePath(), Nil) + } + } + + test("distribute local spark jars") { + val temp = Utils.createTempDir() + val jarsDir = new File(temp, "lib") + assert(jarsDir.mkdir()) + val jar = TestUtils.createJarWithFiles(Map(), jarsDir) + + val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) + val client = createClient(sparkConf) + client.prepareLocalResources(temp.getAbsolutePath(), Nil) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort()) + classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = @@ -280,4 +342,21 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll }.toOption.getOrElse(defaults) } + private def createClient( + sparkConf: SparkConf, + conf: Configuration = new Configuration(), + args: Array[String] = Array()): Client = { + val clientArgs = new ClientArguments(args, sparkConf) + val client = spy(new Client(clientArgs, conf, sparkConf)) + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), + any(classOf[Path]), anyShort()) + client + } + + private def classpath(client: Client): Array[String] = { + val env = new MutableHashMap[String, String]() + populateClasspath(null, client.hadoopConf, client.sparkConf, env, false) + classpath(env) + } + } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 1dd2f93bb7..0587444a33 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -29,6 +29,7 @@ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.YarnAllocator._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.RpcEndpointRef @@ -55,7 +56,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val sparkConf = new SparkConf() sparkConf.set("spark.driver.host", "localhost") sparkConf.set("spark.driver.port", "4040") - sparkConf.set("spark.yarn.jar", "notarealjar.jar") + sparkConf.set(SPARK_JARS, Seq("notarealjar.jar")) sparkConf.set("spark.yarn.launchContainers", "false") val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0) |