From 0d4886c000589d62a61a568aeedeacfeb1c0fc6d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 12 Jan 2014 16:44:58 -0800 Subject: Remove now un-needed hostPort option --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 10 +++++----- core/src/main/scala/org/apache/spark/SparkEnv.scala | 10 ---------- .../scala/org/apache/spark/storage/BlockManager.scala | 2 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 17 ----------------- 4 files changed, 5 insertions(+), 34 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 77b8ca1cce..e462d6f273 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -32,15 +32,16 @@ import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} private[spark] sealed trait MapOutputTrackerMessage -private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) +private[spark] case class GetMapOutputStatuses(shuffleId: Int) extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster) extends Actor with Logging { def receive = { - case GetMapOutputStatuses(shuffleId: Int, requester: String) => - logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester) + case GetMapOutputStatuses(shuffleId: Int) => + val hostPort = sender.path.address.hostPort + logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) sender ! tracker.getSerializedMapOutputStatuses(shuffleId) case StopMapOutputTracker => @@ -119,11 +120,10 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { if (fetchedStatuses == null) { // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) - val hostPort = Utils.localHostPort(conf) // This try-finally prevents hangs due to timeouts: try { val fetchedBytes = - askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]] + askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]] fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 08b592df71..ed788560e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -132,16 +132,6 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", boundPort.toString) } - // set only if unset until now. - if (!conf.contains("spark.hostPort")) { - if (!isDriver){ - // unexpected - Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set") - } - Utils.checkHost(hostname) - conf.set("spark.hostPort", hostname + ":" + boundPort) - } - val classLoader = Thread.currentThread.getContextClassLoader // Create an instance of the class named by the given Java system property, or by diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ff9f241fc1..b6dcfa1bca 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -83,8 +83,6 @@ private[spark] class BlockManager( val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) - val hostPort = Utils.localHostPort(conf) - val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 23b72701c2..34813c1537 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -420,15 +420,6 @@ private[spark] object Utils extends Logging { InetAddress.getByName(address).getHostName } - def localHostPort(conf: SparkConf): String = { - val retval = conf.get("spark.hostPort", null) - if (retval == null) { - logErrorWithStack("spark.hostPort not set but invoking localHostPort") - return localHostName() - } - retval - } - def checkHost(host: String, message: String = "") { assert(host.indexOf(':') == -1, message) } @@ -437,14 +428,6 @@ private[spark] object Utils extends Logging { assert(hostPort.indexOf(':') != -1, message) } - def logErrorWithStack(msg: String) { - try { - throw new Exception - } catch { - case ex: Exception => logError(msg, ex) - } - } - // Typically, this will be of order of number of nodes in cluster // If not, we should change it to LRUCache or something. private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() -- cgit v1.2.3