aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala14
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala8
2 files changed, 18 insertions, 4 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4df90d7b6b..847d1de50f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -160,11 +160,17 @@ private[spark] class ApplicationMaster(
}
// Distribute the conf archive to executors.
- sparkConf.get(CACHED_CONF_ARCHIVE).foreach { uri =>
- val fs = FileSystem.get(new URI(uri), yarnConf)
+ sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
+ val uri = new URI(path)
+ val fs = FileSystem.get(uri, yarnConf)
val status = fs.getFileStatus(new Path(uri))
- setupDistributedCache(uri, LocalResourceType.ARCHIVE, status.getModificationTime().toString,
- status.getLen.toString, LocalResourceVisibility.PRIVATE.name())
+ // SPARK-16080: Make sure to use the correct name for the destination when distributing the
+ // conf archive to executors.
+ val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
+ Client.LOCALIZED_CONF_DIR)
+ setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
+ status.getModificationTime().toString, status.getLen.toString,
+ LocalResourceVisibility.PRIVATE.name())
}
// Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index c465604845..4ce33e0e85 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -292,6 +292,14 @@ private object YarnClusterDriver extends Logging with Matchers {
sc.stop()
}
+ // Verify that the config archive is correctly placed in the classpath of all containers.
+ val confFile = "/" + Client.SPARK_CONF_FILE
+ assert(getClass().getResource(confFile) != null)
+ val configFromExecutors = sc.parallelize(1 to 4, 4)
+ .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
+ .collect()
+ assert(configFromExecutors.find(_ == null) === None)
+
// verify log urls are present
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)