aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-30 15:02:53 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-30 15:02:53 -0800
commitd12330bd2cd354919c414ee7ef367bc28c0eb7c9 (patch)
treeac0932c730287ac76e69990942c79497f74801ff
parent612a9fee718ffb285806223c1a19df80d7979d20 (diff)
parent16a0789e10d2ac714e7c623b026c4a58ca9678d6 (diff)
downloadspark-d12330bd2cd354919c414ee7ef367bc28c0eb7c9.tar.gz
spark-d12330bd2cd354919c414ee7ef367bc28c0eb7c9.tar.bz2
spark-d12330bd2cd354919c414ee7ef367bc28c0eb7c9.zip
Merge pull request #426 from woggling/conn-manager-ips
Remember ConnectionManagerId used to initiate SendingConnections
-rw-r--r--core/src/main/scala/spark/network/Connection.scala15
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala3
2 files changed, 13 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index c193bf7c8d..cd5b7d57f3 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -12,7 +12,14 @@ import java.net._
private[spark]
-abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
+abstract class Connection(val channel: SocketChannel, val selector: Selector,
+ val remoteConnectionManagerId: ConnectionManagerId) extends Logging {
+ def this(channel_ : SocketChannel, selector_ : Selector) = {
+ this(channel_, selector_,
+ ConnectionManagerId.fromSocketAddress(
+ channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
+ ))
+ }
channel.configureBlocking(false)
channel.socket.setTcpNoDelay(true)
@@ -25,7 +32,6 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
val remoteAddress = getRemoteAddress()
- val remoteConnectionManagerId = ConnectionManagerId.fromSocketAddress(remoteAddress)
def key() = channel.keyFor(selector)
@@ -103,8 +109,9 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
}
-private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
-extends Connection(SocketChannel.open, selector_) {
+private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
+ remoteId_ : ConnectionManagerId)
+extends Connection(SocketChannel.open, selector_, remoteId_) {
class Outbox(fair: Int = 0) {
val messages = new Queue[Message]()
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 2ecd14f536..c7f226044d 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -299,7 +299,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
+ val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId,
+ new SendingConnection(inetSocketAddress, selector, connectionManagerId))
newConnection
}
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)