aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/main
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-04-18 10:13:38 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-18 10:13:38 -0700
commitd6fb485de8b79054db08658d904a3148a04d4180 (patch)
tree6c3dda740d018c5c6432070216ea2fc46fa42ce4 /yarn/src/main
parent1a3966472c78794252057d47ff0cffe4329a32f3 (diff)
downloadspark-d6fb485de8b79054db08658d904a3148a04d4180.tar.gz
spark-d6fb485de8b79054db08658d904a3148a04d4180.tar.bz2
spark-d6fb485de8b79054db08658d904a3148a04d4180.zip
[SPARK-14423][YARN] Avoid same name files added to distributed cache again
## What changes were proposed in this pull request? In the current implementation of assembly-free spark deployment, jars under `assembly/target/scala-xxx/jars` will be uploaded to distributed cache by default, there's a chance these jars' name will be conflicted with name of jars specified in `--jars`, this will introduce exception when starting application: ``` client token: N/A diagnostics: Application application_1459907402325_0004 failed 2 times due to AM Container for appattempt_1459907402325_0004_000002 exited with exitCode: -1000 For more detailed output, check application tracking page:http://hw12100.local:8088/proxy/application_1459907402325_0004/Then, click on links to logs of each attempt. Diagnostics: Resource hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar changed on src filesystem (expected 1459909780508, was 1459909782590 java.io.IOException: Resource hdfs://localhost:8020/user/sshao/.sparkStaging/application_1459907402325_0004/avro-mapred-1.7.7-hadoop2.jar changed on src filesystem (expected 1459909780508, was 1459909782590 at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:61) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:357) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:356) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:60) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` So here by checking the name of file to avoid same name files uploaded again. ## How was this patch tested? Unit test and manual integrated test is done locally. Author: jerryshao <sshao@hortonworks.com> Closes #12203 from jerryshao/SPARK-14423.
Diffstat (limited to 'yarn/src/main')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala14
1 files changed, 11 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 04e91f8553..7c168ed279 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -364,6 +364,10 @@ private[spark] class Client(
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
+ // Used to keep track of URIs(files) added to the distribute cache have the same name. If
+ // same name but different path files are added multiple time, YARN will fail to launch
+ // containers for the app with an internal error.
+ val distributedNames = new HashSet[String]
YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
@@ -376,11 +380,16 @@ private[spark] class Client(
def addDistributedUri(uri: URI): Boolean = {
val uriStr = uri.toString()
+ val fileName = new File(uri.getPath).getName
if (distributedUris.contains(uriStr)) {
- logWarning(s"Resource $uri added multiple times to distributed cache.")
+ logWarning(s"Same path resource $uri added multiple times to distributed cache.")
+ false
+ } else if (distributedNames.contains(fileName)) {
+ logWarning(s"Same name resource $uri added multiple times to distributed cache")
false
} else {
distributedUris += uriStr
+ distributedNames += fileName
true
}
}
@@ -519,8 +528,7 @@ private[spark] class Client(
).foreach { case (flist, resType, addToClasspath) =>
flist.foreach { file =>
val (_, localizedPath) = distribute(file, resType = resType)
- require(localizedPath != null)
- if (addToClasspath) {
+ if (addToClasspath && localizedPath != null) {
cachedSecondaryJarLinks += localizedPath
}
}