aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala34
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala3
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala3
7 files changed, 29 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 09a9ccc226..8de3a6c04d 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -160,7 +160,7 @@ private[spark] class HttpServer(
throw new ServerStateException("Server is not started")
} else {
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
- s"$scheme://${Utils.localIpAddress}:$port"
+ s"$scheme://${Utils.localHostNameForURI()}:$port"
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 3ab425aab8..f0e77c2ba9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -53,7 +53,7 @@ class LocalSparkCluster(
/* Start the Master */
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterActorSystems += masterSystem
- val masterUrl = "spark://" + localHostname + ":" + masterPort
+ val masterUrl = "spark://" + Utils.localHostNameForURI() + ":" + masterPort
val masters = Array(masterUrl)
/* Start the Workers */
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index c1c4812f17..40835b9550 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -46,7 +46,7 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
val conf = new SparkConf
- val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+ val (actorSystem, _) = AkkaUtils.createActorSystem("spark", Utils.localHostName(), 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index ea548f2312..f9860d1a5c 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -48,7 +48,7 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
- protected val localHostName = Utils.localHostName()
+ protected val localHostName = Utils.localHostNameForURI()
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
private val className = Utils.getFormattedClassName(this)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0fdfaf300e..a541d660cd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -34,6 +34,7 @@ import scala.util.Try
import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.{ByteStreams, Files}
+import com.google.common.net.InetAddresses
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
@@ -789,13 +790,12 @@ private[spark] object Utils extends Logging {
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
* Note, this is typically not used from within core spark.
*/
- lazy val localIpAddress: String = findLocalIpAddress()
- lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
+ private lazy val localIpAddress: InetAddress = findLocalInetAddress()
- private def findLocalIpAddress(): String = {
+ private def findLocalInetAddress(): InetAddress = {
val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
if (defaultIpOverride != null) {
- defaultIpOverride
+ InetAddress.getByName(defaultIpOverride)
} else {
val address = InetAddress.getLocalHost
if (address.isLoopbackAddress) {
@@ -806,15 +806,20 @@ private[spark] object Utils extends Logging {
// It's more proper to pick ip address following system output order.
val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList
val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse
+
for (ni <- reOrderedNetworkIFs) {
- for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
- !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
+ val addresses = ni.getInetAddresses.toList
+ .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress)
+ if (addresses.nonEmpty) {
+ val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
+ // because of Inet6Address.toHostName may add interface at the end if it knows about it
+ val strippedAddress = InetAddress.getByAddress(addr.getAddress)
// We've found an address that looks reasonable!
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
- " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
- " instead (on interface " + ni.getName + ")")
+ " a loopback address: " + address.getHostAddress + "; using " +
+ strippedAddress.getHostAddress + " instead (on interface " + ni.getName + ")")
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
- return addr.getHostAddress
+ return strippedAddress
}
}
logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
@@ -822,7 +827,7 @@ private[spark] object Utils extends Logging {
" external IP address!")
logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
}
- address.getHostAddress
+ address
}
}
@@ -842,11 +847,14 @@ private[spark] object Utils extends Logging {
* Get the local machine's hostname.
*/
def localHostName(): String = {
- customHostname.getOrElse(localIpAddressHostname)
+ customHostname.getOrElse(localIpAddress.getHostAddress)
}
- def getAddressHostName(address: String): String = {
- InetAddress.getByName(address).getHostName
+ /**
+ * Get the local machine's URI.
+ */
+ def localHostNameForURI(): String = {
+ customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}
def checkHost(host: String, message: String = "") {
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 1bd1f32429..a7fe4476ca 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -23,6 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
@@ -118,7 +119,7 @@ private[kinesis] class KinesisReceiver(
* method.
*/
override def onStart() {
- workerId = InetAddress.getLocalHost.getHostAddress() + ":" + UUID.randomUUID()
+ workerId = Utils.localHostName() + ":" + UUID.randomUUID()
credentialsProvider = new DefaultAWSCredentialsProviderChain()
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, streamName,
credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 158c225159..97b46a01ba 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.{HiveShim, HiveContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.util.Utils
/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
@@ -37,7 +38,7 @@ private[hive] object SparkSQLEnv extends Logging {
val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking")
sparkConf
- .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
+ .setAppName(s"SparkSQL::${Utils.localHostName()}")
.set("spark.sql.hive.version", HiveShim.version)
.set(
"spark.serializer",