aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/storage/BlockManagerId.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManagerId.scala')
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerId.scala62
1 files changed, 43 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala
index f2f1e77d41..1e557d6148 100644
--- a/core/src/main/scala/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerId.scala
@@ -2,51 +2,70 @@ package spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
+import spark.Utils
/**
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
- * BlockManagerId objects can be created only using the factory method in
- * [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects.
+ * BlockManagerId objects can be created only using the apply method in
+ * the companion object. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*/
private[spark] class BlockManagerId private (
private var executorId_ : String,
- private var ip_ : String,
- private var port_ : Int
+ private var host_ : String,
+ private var port_ : Int,
+ private var nettyPort_ : Int
) extends Externalizable {
- private def this() = this(null, null, 0) // For deserialization only
+ private def this() = this(null, null, 0, 0) // For deserialization only
def executorId: String = executorId_
- def ip: String = ip_
+ if (null != host_){
+ Utils.checkHost(host_, "Expected hostname")
+ assert (port_ > 0)
+ }
+
+ def hostPort: String = {
+ // DEBUG code
+ Utils.checkHost(host)
+ assert (port > 0)
+
+ host + ":" + port
+ }
+
+ def host: String = host_
def port: Int = port_
+ def nettyPort: Int = nettyPort_
+
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
- out.writeUTF(ip_)
+ out.writeUTF(host_)
out.writeInt(port_)
+ out.writeInt(nettyPort_)
}
override def readExternal(in: ObjectInput) {
executorId_ = in.readUTF()
- ip_ = in.readUTF()
+ host_ = in.readUTF()
port_ = in.readInt()
+ nettyPort_ = in.readInt()
}
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, ip, port)
+ override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
- override def hashCode: Int = (executorId.hashCode * 41 + ip.hashCode) * 41 + port
+ override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
override def equals(that: Any) = that match {
case id: BlockManagerId =>
- executorId == id.executorId && port == id.port && ip == id.ip
+ executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
case _ =>
false
}
@@ -55,8 +74,17 @@ private[spark] class BlockManagerId private (
private[spark] object BlockManagerId {
- def apply(execId: String, ip: String, port: Int) =
- getCachedBlockManagerId(new BlockManagerId(execId, ip, port))
+ /**
+ * Returns a [[spark.storage.BlockManagerId]] for the given configuraiton.
+ *
+ * @param execId ID of the executor.
+ * @param host Host name of the block manager.
+ * @param port Port of the block manager.
+ * @param nettyPort Optional port for the Netty-based shuffle sender.
+ * @return A new [[spark.storage.BlockManagerId]].
+ */
+ def apply(execId: String, host: String, port: Int, nettyPort: Int) =
+ getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()
@@ -67,11 +95,7 @@ private[spark] object BlockManagerId {
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
- if (blockManagerIdCache.containsKey(id)) {
- blockManagerIdCache.get(id)
- } else {
- blockManagerIdCache.put(id, id)
- id
- }
+ blockManagerIdCache.putIfAbsent(id, id)
+ blockManagerIdCache.get(id)
}
}