aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala17
4 files changed, 5 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 77b8ca1cce..e462d6f273 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -32,15 +32,16 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AkkaUtils, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
private[spark] sealed trait MapOutputTrackerMessage
-private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
+private[spark] case class GetMapOutputStatuses(shuffleId: Int)
extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster)
extends Actor with Logging {
def receive = {
- case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
- logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
+ case GetMapOutputStatuses(shuffleId: Int) =>
+ val hostPort = sender.path.address.hostPort
+ logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
sender ! tracker.getSerializedMapOutputStatuses(shuffleId)
case StopMapOutputTracker =>
@@ -119,11 +120,10 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val hostPort = Utils.localHostPort(conf)
// This try-finally prevents hangs due to timeouts:
try {
val fetchedBytes =
- askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]]
+ askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 08b592df71..ed788560e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -132,16 +132,6 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}
- // set only if unset until now.
- if (!conf.contains("spark.hostPort")) {
- if (!isDriver){
- // unexpected
- Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
- }
- Utils.checkHost(hostname)
- conf.set("spark.hostPort", hostname + ":" + boundPort)
- }
-
val classLoader = Thread.currentThread.getContextClassLoader
// Create an instance of the class named by the given Java system property, or by
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ff9f241fc1..b6dcfa1bca 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -83,8 +83,6 @@ private[spark] class BlockManager(
val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
- val hostPort = Utils.localHostPort(conf)
-
val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)),
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 23b72701c2..34813c1537 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -420,15 +420,6 @@ private[spark] object Utils extends Logging {
InetAddress.getByName(address).getHostName
}
- def localHostPort(conf: SparkConf): String = {
- val retval = conf.get("spark.hostPort", null)
- if (retval == null) {
- logErrorWithStack("spark.hostPort not set but invoking localHostPort")
- return localHostName()
- }
- retval
- }
-
def checkHost(host: String, message: String = "") {
assert(host.indexOf(':') == -1, message)
}
@@ -437,14 +428,6 @@ private[spark] object Utils extends Logging {
assert(hostPort.indexOf(':') != -1, message)
}
- def logErrorWithStack(msg: String) {
- try {
- throw new Exception
- } catch {
- case ex: Exception => logError(msg, ex)
- }
- }
-
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()