aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-05 21:08:21 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-05 21:08:21 -0600
commit67df7f2fa2e09487fe8dcf39ab80606d95383ea5 (patch)
tree6f234cb3f8a3f23708f09be10b61eff56235043b
parent9cfa06837998f30e50b160bc7aaaad3b33a23c5e (diff)
downloadspark-67df7f2fa2e09487fe8dcf39ab80606d95383ea5.tar.gz
spark-67df7f2fa2e09487fe8dcf39ab80606d95383ea5.tar.bz2
spark-67df7f2fa2e09487fe8dcf39ab80606d95383ea5.zip
Add private, minor formatting.
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala35
1 files changed, 14 insertions, 21 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index c7f226044d..b6ec664d7e 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -66,31 +66,28 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
- val thisInstance = this
val selectorThread = new Thread("connection-manager-thread") {
- override def run() {
- thisInstance.run()
- }
+ override def run() = ConnectionManager.this.run()
}
selectorThread.setDaemon(true)
selectorThread.start()
- def run() {
+ private def run() {
try {
while(!selectorThread.isInterrupted) {
- for( (connectionManagerId, sendingConnection) <- connectionRequests) {
+ for ((connectionManagerId, sendingConnection) <- connectionRequests) {
sendingConnection.connect()
addConnection(sendingConnection)
connectionRequests -= connectionManagerId
}
sendMessageRequests.synchronized {
- while(!sendMessageRequests.isEmpty) {
+ while (!sendMessageRequests.isEmpty) {
val (message, connection) = sendMessageRequests.dequeue
connection.send(message)
}
}
- while(!keyInterestChangeRequests.isEmpty) {
+ while (!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue
val connection = connectionsByKey(key)
val lastOps = key.interestOps()
@@ -126,14 +123,11 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
if (key.isValid) {
if (key.isAcceptable) {
acceptConnection(key)
- } else
- if (key.isConnectable) {
+ } else if (key.isConnectable) {
connectionsByKey(key).asInstanceOf[SendingConnection].finishConnect()
- } else
- if (key.isReadable) {
+ } else if (key.isReadable) {
connectionsByKey(key).read()
- } else
- if (key.isWritable) {
+ } else if (key.isWritable) {
connectionsByKey(key).write()
}
}
@@ -144,7 +138,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
}
- def acceptConnection(key: SelectionKey) {
+ private def acceptConnection(key: SelectionKey) {
val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
val newChannel = serverChannel.accept()
val newConnection = new ReceivingConnection(newChannel, selector)
@@ -154,7 +148,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]")
}
- def addConnection(connection: Connection) {
+ private def addConnection(connection: Connection) {
connectionsByKey += ((connection.key, connection))
if (connection.isInstanceOf[SendingConnection]) {
val sendingConnection = connection.asInstanceOf[SendingConnection]
@@ -165,7 +159,7 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
connection.onClose(removeConnection)
}
- def removeConnection(connection: Connection) {
+ private def removeConnection(connection: Connection) {
connectionsByKey -= connection.key
if (connection.isInstanceOf[SendingConnection]) {
val sendingConnection = connection.asInstanceOf[SendingConnection]
@@ -222,16 +216,16 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
}
- def handleConnectionError(connection: Connection, e: Exception) {
+ private def handleConnectionError(connection: Connection, e: Exception) {
logInfo("Handling connection error on connection to " + connection.remoteConnectionManagerId)
removeConnection(connection)
}
- def changeConnectionKeyInterest(connection: Connection, ops: Int) {
+ private def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops))
}
- def receiveMessage(connection: Connection, message: Message) {
+ private def receiveMessage(connection: Connection, message: Message) {
val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress)
logDebug("Received [" + message + "] from [" + connectionManagerId + "]")
val runnable = new Runnable() {
@@ -351,7 +345,6 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private[spark] object ConnectionManager {
def main(args: Array[String]) {
-
val manager = new ConnectionManager(9999)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")