diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2016-04-20 16:57:23 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-04-20 16:57:23 -0700 |
commit | f47dbf27fa034629fab12d0f3c89ab75edb03f86 (patch) | |
tree | 2445f305b0ef90e5577ec0bc608f1cf68f28960e /yarn/src/test/scala | |
parent | 334c293ec0bcc2195d502c574ca40dbc4769d666 (diff) | |
download | spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.tar.gz spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.tar.bz2 spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.zip |
[SPARK-14602][YARN] Use SparkConf to propagate the list of cached files.
This change avoids using the environment to pass this information, since
with many jars it's easy to hit limits on certain OSes. Instead, it encodes
the information into the Spark configuration propagated to the AM.
The first problem that needed to be solved is a chicken & egg issue: the
config file is distributed using the cache, and it needs to contain information
about the files that are being distributed. To solve that, the code now treats
the config archive especially, and uses slightly different code to distribute
it, so that only its cache path needs to be saved to the config file.
The second problem is that the extra information would show up in the Web UI,
which made the environment tab even more noisy than it already is when lots
of jars are listed. This is solved by two changes: the list of cached files
is now read only once in the AM, and propagated down to the ExecutorRunnable
code (which actually sends the list to the NMs when starting containers). The
second change is to unset those config entries after the list is read, so that
the SparkContext never sees them.
Tested with both client and cluster mode by running "run-example SparkPi". This
uploads a whole lot of files when run from a build dir (instead of a distribution,
where the list is cleaned up), and I verified that the configs do not show
up in the UI.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #12487 from vanzin/SPARK-14602.
Diffstat (limited to 'yarn/src/test/scala')
3 files changed, 48 insertions, 56 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index ac8f663df2..b696e080ce 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.mockito.Mockito.when import org.scalatest.mock.MockitoSugar -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.yarn.config._ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar { @@ -84,18 +85,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource.getSize() === 0) assert(resource.getType() === LocalResourceType.FILE) - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0") - assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0") - assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + val sparkConf = new SparkConf(false) + distMgr.updateConfiguration(sparkConf) + assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link")) + assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L)) + assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L)) + assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name())) + assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name())) // add another one and verify both there and order correct val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", @@ -111,20 +107,22 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource2.getSize() === 20) assert(resource2.getType() === LocalResourceType.FILE) - val env2 = new HashMap[String, String]() - distMgr.setDistFilesEnv(env2) - val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') - val files = env2("SPARK_YARN_CACHE_FILES").split(',') - val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') - val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',') + val sparkConf2 = new SparkConf(false) + distMgr.updateConfiguration(sparkConf2) + + val files = sparkConf2.get(CACHED_FILES) + val sizes = sparkConf2.get(CACHED_FILES_SIZES) + val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS) + val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES) + assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(timestamps(0) === "0") - assert(sizes(0) === "0") + assert(timestamps(0) === 0) + assert(sizes(0) === 0) assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") - assert(timestamps(1) === "10") - assert(sizes(1) === "20") + assert(timestamps(1) === 10) + assert(sizes(1) === 20) assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) } @@ -165,18 +163,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource.getSize() === 20) assert(resource.getType() === LocalResourceType.ARCHIVE) - val env = new HashMap[String, String]() - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) - - distMgr.setDistArchivesEnv(env) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + val sparkConf = new SparkConf(false) + distMgr.updateConfiguration(sparkConf) + assert(sparkConf.get(CACHED_FILES) === Nil) + assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil) + assert(sparkConf.get(CACHED_FILES_SIZES) === Nil) + assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil) + assert(sparkConf.get(CACHED_FILES_TYPES) === Nil) } test("test addResource archive") { @@ -199,20 +192,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar assert(resource.getSize() === 20) assert(resource.getType() === LocalResourceType.ARCHIVE) - val env = new HashMap[String, String]() - - distMgr.setDistArchivesEnv(env) - assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link") - assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10") - assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20") - assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) - - distMgr.setDistFilesEnv(env) - assert(env.get("SPARK_YARN_CACHE_FILES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) - assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) + val sparkConf = new SparkConf(false) + distMgr.updateConfiguration(sparkConf) + assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link")) + assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L)) + assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L)) + assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name())) + assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name())) } - } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 06efd44b5d..f196a0d8ca 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -243,9 +243,12 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll assert(sparkConf.get(SPARK_JARS) === Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(), + anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(), + anyBoolean(), any()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(), + anyBoolean(), any()) val cp = classpath(client) cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) @@ -262,7 +265,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(), + anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) @@ -281,7 +285,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort()) + verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(), + anyBoolean(), any()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } @@ -382,7 +387,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val clientArgs = new ClientArguments(args) val client = spy(new Client(clientArgs, conf, sparkConf)) doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort()) + any(classOf[Path]), anyShort(), anyBoolean(), any()) client } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index a641a6e73e..784c6525e5 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -104,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConfClone, rmClient, appAttemptId, - new SecurityManager(sparkConf)) + new SecurityManager(sparkConf), + Map()) } def createContainer(host: String): Container = { |