diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-12 22:35:14 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-12 22:35:14 -0800 |
commit | e6ed13f255d70de422711b979447690cdab7423b (patch) | |
tree | ac4774bf41c478fce612477dff77933845022802 /core | |
parent | 0b96d85c2063bd2864b5753496551c6cf2f9047a (diff) | |
parent | 0bb33076e2c12947edc91ff61f98e4b72d881ec3 (diff) | |
download | spark-e6ed13f255d70de422711b979447690cdab7423b.tar.gz spark-e6ed13f255d70de422711b979447690cdab7423b.tar.bz2 spark-e6ed13f255d70de422711b979447690cdab7423b.zip |
Merge pull request #397 from pwendell/host-port
Remove now un-needed hostPort option
I noticed this was logging some scary error messages in various places. After I looked into it, this is no longer really used. I removed the option and re-wrote the one remaining use case (it was unnecessary there anyways).
Diffstat (limited to 'core')
9 files changed, 6 insertions, 42 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 80a611b180..30d182b008 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -32,15 +32,16 @@ import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} private[spark] sealed trait MapOutputTrackerMessage -private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) +private[spark] case class GetMapOutputStatuses(shuffleId: Int) extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster) extends Actor with Logging { def receive = { - case GetMapOutputStatuses(shuffleId: Int, requester: String) => - logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester) + case GetMapOutputStatuses(shuffleId: Int) => + val hostPort = sender.path.address.hostPort + logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) sender ! tracker.getSerializedMapOutputStatuses(shuffleId) case StopMapOutputTracker => @@ -119,11 +120,10 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { if (fetchedStatuses == null) { // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) - val hostPort = Utils.localHostPort(conf) // This try-finally prevents hangs due to timeouts: try { val fetchedBytes = - askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]] + askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]] fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 08b592df71..ed788560e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -132,16 +132,6 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", boundPort.toString) } - // set only if unset until now. - if (!conf.contains("spark.hostPort")) { - if (!isDriver){ - // unexpected - Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set") - } - Utils.checkHost(hostname) - conf.set("spark.hostPort", hostname + ":" + boundPort) - } - val classLoader = Thread.currentThread.getContextClassLoader // Create an instance of the class named by the given Java system property, or by diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f9e43e0e94..45b43b403d 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -103,7 +103,6 @@ private[spark] object CoarseGrainedExecutorBackend { indestructible = true, conf = new SparkConf) // set it val sparkHostPort = hostname + ":" + boundPort -// conf.set("spark.hostPort", sparkHostPort) actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8d596a76c2..0208388e86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { - if (key.startsWith("spark.") && !key.equals("spark.hostPort")) { + if (key.startsWith("spark.")) { properties += ((key, value)) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e1fc0c707f..6f1345c57a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -83,8 +83,6 @@ private[spark] class BlockManager( val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) - val hostPort = Utils.localHostPort(conf) - val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a98adf9835..caa9bf4c92 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -420,15 +420,6 @@ private[spark] object Utils extends Logging { InetAddress.getByName(address).getHostName } - def localHostPort(conf: SparkConf): String = { - val retval = conf.get("spark.hostPort", null) - if (retval == null) { - logErrorWithStack("spark.hostPort not set but invoking localHostPort") - return localHostName() - } - retval - } - def checkHost(host: String, message: String = "") { assert(host.indexOf(':') == -1, message) } @@ -437,14 +428,6 @@ private[spark] object Utils extends Logging { assert(hostPort.indexOf(':') != -1, message) } - def logErrorWithStack(msg: String) { - try { - throw new Exception - } catch { - case ex: Exception => logError(msg, ex) - } - } - // Typically, this will be of order of number of nodes in cluster // If not, we should change it to LRUCache or something. private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index 8dd5786da6..3ac706110e 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -53,7 +53,6 @@ object LocalSparkContext { } // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index afc1beff98..930c2523ca 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -99,7 +99,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf) System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext - System.setProperty("spark.hostPort", hostname + ":" + boundPort) val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index f60ce270c7..18aa587662 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -53,7 +53,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf) this.actorSystem = actorSystem conf.set("spark.driver.port", boundPort.toString) - conf.set("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) @@ -65,13 +64,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT conf.set("spark.storage.disableBlockManagerHeartBeat", "true") val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() - // Set some value ... - conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111) } after { System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") if (store != null) { store.stop() |