aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-10-08 08:51:17 -0500
committerThomas Graves <tgraves@apache.org>2014-10-08 08:51:17 -0500
commita44af7302f814204fdbcc7ad620bc6984b376468 (patch)
tree50b365e6fb12f49c7bdbb97fd99a95f1c5b19355
parenta1f833f751783e0d8a0dab4d073ff5e9e70c36f9 (diff)
downloadspark-a44af7302f814204fdbcc7ad620bc6984b376468.tar.gz
spark-a44af7302f814204fdbcc7ad620bc6984b376468.tar.bz2
spark-a44af7302f814204fdbcc7ad620bc6984b376468.zip
[SPARK-3788] [yarn] Fix compareFs to do the right thing for HDFS namespaces (1.1 version).
HA and viewfs use namespaces instead of host names, so you can't resolve them since that will fail. So be smarter to avoid doing unnecessary work. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2650 from vanzin/SPARK-3788-1.1 and squashes the following commits: 174bf71 [Marcelo Vanzin] Update comment. 0e36be7 [Marcelo Vanzin] Use Objects.equal() instead of ==. 772aead [Marcelo Vanzin] [SPARK-3788] [yarn] Fix compareFs to do the right thing for HA, federation (1.1 version).
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala29
1 files changed, 11 insertions, 18 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6da3b16a54..27ee04a5a5 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.util.{Try, Success, Failure}
+import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
@@ -122,15 +123,17 @@ trait ClientBase extends Logging {
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
val srcUri = srcFs.getUri()
val dstUri = destFs.getUri()
- if (srcUri.getScheme() == null) {
- return false
- }
- if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) {
return false
}
+
var srcHost = srcUri.getHost()
var dstHost = dstUri.getHost()
- if ((srcHost != null) && (dstHost != null)) {
+
+ // In HA or when using viewfs, the host part of the URI may not actually be a host, but the
+ // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they
+ // match.
+ if (srcHost != null && dstHost != null && srcHost != dstHost) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
@@ -138,19 +141,9 @@ trait ClientBase extends Logging {
case e: UnknownHostException =>
return false
}
- if (!srcHost.equals(dstHost)) {
- return false
- }
- } else if (srcHost == null && dstHost != null) {
- return false
- } else if (srcHost != null && dstHost == null) {
- return false
- }
- if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
}
+
+ Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
}
/** Copy the file into HDFS if needed. */
@@ -621,7 +614,7 @@ object ClientBase extends Logging {
YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
File.pathSeparator)
- /**
+ /**
* Get the list of namenodes the user may access.
*/
private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {