aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-04-20 16:57:23 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-20 16:57:23 -0700
commitf47dbf27fa034629fab12d0f3c89ab75edb03f86 (patch)
tree2445f305b0ef90e5577ec0bc608f1cf68f28960e /yarn/src/test/scala/org
parent334c293ec0bcc2195d502c574ca40dbc4769d666 (diff)
downloadspark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.tar.gz
spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.tar.bz2
spark-f47dbf27fa034629fab12d0f3c89ab75edb03f86.zip
[SPARK-14602][YARN] Use SparkConf to propagate the list of cached files.
This change avoids using the environment to pass this information, since with many jars it's easy to hit limits on certain OSes. Instead, it encodes the information into the Spark configuration propagated to the AM. The first problem that needed to be solved is a chicken & egg issue: the config file is distributed using the cache, and it needs to contain information about the files that are being distributed. To solve that, the code now treats the config archive especially, and uses slightly different code to distribute it, so that only its cache path needs to be saved to the config file. The second problem is that the extra information would show up in the Web UI, which made the environment tab even more noisy than it already is when lots of jars are listed. This is solved by two changes: the list of cached files is now read only once in the AM, and propagated down to the ExecutorRunnable code (which actually sends the list to the NMs when starting containers). The second change is to unset those config entries after the list is read, so that the SparkContext never sees them. Tested with both client and cluster mode by running "run-example SparkPi". This uploads a whole lot of files when run from a build dir (instead of a distribution, where the list is cleaned up), and I verified that the configs do not show up in the UI. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #12487 from vanzin/SPARK-14602.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala84
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala17
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala3
3 files changed, 48 insertions, 56 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index ac8f663df2..b696e080ce 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -33,7 +33,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.mockito.Mockito.when
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
@@ -84,18 +85,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 0)
assert(resource.getType() === LocalResourceType.FILE)
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0")
- assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(0L))
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(0L))
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.FILE.name()))
// add another one and verify both there and order correct
val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
@@ -111,20 +107,22 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource2.getSize() === 20)
assert(resource2.getType() === LocalResourceType.FILE)
- val env2 = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env2)
- val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
- val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
- val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
+ val sparkConf2 = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf2)
+
+ val files = sparkConf2.get(CACHED_FILES)
+ val sizes = sparkConf2.get(CACHED_FILES_SIZES)
+ val timestamps = sparkConf2.get(CACHED_FILES_TIMESTAMPS)
+ val visibilities = sparkConf2.get(CACHED_FILES_VISIBILITIES)
+
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(timestamps(0) === "0")
- assert(sizes(0) === "0")
+ assert(timestamps(0) === 0)
+ assert(sizes(0) === 0)
assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name())
assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2")
- assert(timestamps(1) === "10")
- assert(sizes(1) === "20")
+ assert(timestamps(1) === 10)
+ assert(sizes(1) === 20)
assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name())
}
@@ -165,18 +163,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
- val env = new HashMap[String, String]()
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
-
- distMgr.setDistArchivesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Nil)
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Nil)
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Nil)
}
test("test addResource archive") {
@@ -199,20 +192,13 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(resource.getSize() === 20)
assert(resource.getType() === LocalResourceType.ARCHIVE)
- val env = new HashMap[String, String]()
-
- distMgr.setDistArchivesEnv(env)
- assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20")
- assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name())
-
- distMgr.setDistFilesEnv(env)
- assert(env.get("SPARK_YARN_CACHE_FILES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None)
- assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None)
+ val sparkConf = new SparkConf(false)
+ distMgr.updateConfiguration(sparkConf)
+ assert(sparkConf.get(CACHED_FILES) === Seq("file:/foo.invalid.com:8080/tmp/testing#link"))
+ assert(sparkConf.get(CACHED_FILES_SIZES) === Seq(20L))
+ assert(sparkConf.get(CACHED_FILES_TIMESTAMPS) === Seq(10L))
+ assert(sparkConf.get(CACHED_FILES_VISIBILITIES) === Seq(LocalResourceVisibility.PRIVATE.name()))
+ assert(sparkConf.get(CACHED_FILES_TYPES) === Seq(LocalResourceType.ARCHIVE.name()))
}
-
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 06efd44b5d..f196a0d8ca 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -243,9 +243,12 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort())
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(),
+ anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(),
+ anyBoolean(), any())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(),
+ anyBoolean(), any())
val cp = classpath(client)
cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -262,7 +265,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(),
+ anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
@@ -281,7 +285,8 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
- verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
+ verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort(),
+ anyBoolean(), any())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@@ -382,7 +387,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val clientArgs = new ClientArguments(args)
val client = spy(new Client(clientArgs, conf, sparkConf))
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
- any(classOf[Path]), anyShort())
+ any(classOf[Path]), anyShort(), anyBoolean(), any())
client
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index a641a6e73e..784c6525e5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -104,7 +104,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
sparkConfClone,
rmClient,
appAttemptId,
- new SecurityManager(sparkConf))
+ new SecurityManager(sparkConf),
+ Map())
}
def createContainer(host: String): Container = {