diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2014-06-23 08:51:11 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-06-23 08:51:11 -0500 |
commit | e380767de344fd6898429de43da592658fd86a39 (patch) | |
tree | cfc059fbb4bfe58573ca6c96b35e82a7af664fb6 /yarn/common/src/test/scala | |
parent | 9cb64b2c54b35eed373f54c2103f679b04e9af1e (diff) | |
download | spark-e380767de344fd6898429de43da592658fd86a39.tar.gz spark-e380767de344fd6898429de43da592658fd86a39.tar.bz2 spark-e380767de344fd6898429de43da592658fd86a39.zip |
[SPARK-1395] Fix "local:" URI support in Yarn mode (again).
Recent changes ignored the fact that path may be defined with "local:"
URIs, which means they need to be explicitly added to the classpath
everywhere a remote process is started. This change fixes that by:
- Using the correct methods to add paths to the classpath
- Creating SparkConf settings for the Spark jar itself and for the
user's jar
- Propagating those two settings to the remote processes where needed
This ensures that both in client and in cluster mode, the driver has
the necessary info to build the executor's classpath and have things
still work when they contain "local:" references.
The change also fixes some confusion in ClientBase about whether
to use SparkConf or system properties to propagate config options to
the driver and executors, by standardizing on using data held by
SparkConf.
On the cleanup front, I removed the hacky way that log4j configuration
was being propagated to handle the "local:" case. It's much more cleanly
(and generically) handled by using spark-submit arguments (--files to
upload a config file, or setting spark.executor.extraJavaOptions to pass
JVM arguments and use a local file).
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #560 from vanzin/yarn-local-2 and squashes the following commits:
4e7f066 [Marcelo Vanzin] Correctly propagate SPARK_JAVA_OPTS to driver/executor.
6a454ea [Marcelo Vanzin] Use constants for PWD in test.
6dd5943 [Marcelo Vanzin] Fix propagation of config options to driver / executor.
b2e377f [Marcelo Vanzin] Review feedback.
93c3f85 [Marcelo Vanzin] Fix ClassCastException in test.
e5c682d [Marcelo Vanzin] Fix cluster mode, restore SPARK_LOG4J_CONF.
1dfbb40 [Marcelo Vanzin] Add documentation for spark.yarn.jar.
bbdce05 [Marcelo Vanzin] [SPARK-1395] Fix "local:" URI support in Yarn mode (again).
Diffstat (limited to 'yarn/common/src/test/scala')
-rw-r--r-- | yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala | 90 |
1 files changed, 87 insertions, 3 deletions
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index 608c6e9262..686714dc36 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -17,22 +17,31 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.URI +import com.google.common.io.Files import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants.Environment - +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.Matchers._ +import org.mockito.Mockito._ import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers._ +import org.scalatest.Matchers import scala.collection.JavaConversions._ import scala.collection.mutable.{ HashMap => MutableHashMap } import scala.util.Try +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils -class ClientBaseSuite extends FunSuite { +class ClientBaseSuite extends FunSuite with Matchers { test("default Yarn application classpath") { ClientBase.getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) @@ -68,6 +77,67 @@ class ClientBaseSuite extends FunSuite { } } + private val SPARK = "local:/sparkJar" + private val USER = "local:/userJar" + private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" + + test("Local jar URIs") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val env = new MutableHashMap[String, String]() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + ClientBase.populateClasspath(args, conf, sparkConf, env, None) + + val cp = env("CLASSPATH").split(File.pathSeparator) + s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => + val uri = new URI(entry) + if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { + cp should contain (uri.getPath()) + } else { + cp should not contain (uri.getPath()) + } + }) + cp should contain (Environment.PWD.$()) + cp should contain (s"${Environment.PWD.$()}${File.separator}*") + cp should not contain (ClientBase.SPARK_JAR) + cp should not contain (ClientBase.APP_JAR) + } + + test("Jar path propagation through SparkConf") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) + val yarnConf = new YarnConfiguration() + val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) + + val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) + doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]), + any(classOf[Path]), anyShort(), anyBoolean()) + + var tempDir = Files.createTempDir(); + try { + client.prepareLocalResources(tempDir.getAbsolutePath()) + sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) + + // The non-local path should be propagated by name only, since it will end up in the app's + // staging dir. + val expected = ADDED.split(",") + .map(p => { + val uri = new URI(p) + if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { + p + } else { + Option(uri.getFragment()).getOrElse(new File(p).getName()) + } + }) + .mkString(",") + + sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) + } finally { + Utils.deleteRecursively(tempDir) + } + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = @@ -109,4 +179,18 @@ class ClientBaseSuite extends FunSuite { def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) + private class DummyClient( + val args: ClientArguments, + val conf: Configuration, + val sparkConf: SparkConf, + val yarnConf: YarnConfiguration) extends ClientBase { + + override def calculateAMMemory(newApp: GetNewApplicationResponse): Int = + throw new UnsupportedOperationException() + + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = + throw new UnsupportedOperationException() + + } + } |