aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala31
1 files changed, 12 insertions, 19 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6ecac6eae6..14a0386b78 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.util.{Try, Success, Failure}
+import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
@@ -64,12 +65,12 @@ private[spark] trait ClientBase extends Logging {
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = args.executorMemory + executorMemoryOverhead
if (executorMem > maxMem) {
- throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
+ throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {
- throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
+ throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
}
logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
@@ -771,15 +772,17 @@ private[spark] object ClientBase extends Logging {
private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
val srcUri = srcFs.getUri()
val dstUri = destFs.getUri()
- if (srcUri.getScheme() == null) {
- return false
- }
- if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) {
return false
}
+
var srcHost = srcUri.getHost()
var dstHost = dstUri.getHost()
- if ((srcHost != null) && (dstHost != null)) {
+
+ // In HA or when using viewfs, the host part of the URI may not actually be a host, but the
+ // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they
+ // match.
+ if (srcHost != null && dstHost != null && srcHost != dstHost) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
@@ -787,19 +790,9 @@ private[spark] object ClientBase extends Logging {
case e: UnknownHostException =>
return false
}
- if (!srcHost.equals(dstHost)) {
- return false
- }
- } else if (srcHost == null && dstHost != null) {
- return false
- } else if (srcHost != null && dstHost == null) {
- return false
- }
- if (srcUri.getPort() != dstUri.getPort()) {
- false
- } else {
- true
}
+
+ Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
}
}