aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenry Saputra <henry.saputra@gmail.com>2014-06-23 17:13:26 -0700
committerReynold Xin <rxin@apache.org>2014-06-23 17:13:26 -0700
commit383bf72c115b09d68cbde0d35ed89808ce04863d (patch)
treec1a0e5ed332125e3c18a8391b2d451e925388717
parent21ddd7d1e9f8e2a726427f32422c31706a20ba3f (diff)
downloadspark-383bf72c115b09d68cbde0d35ed89808ce04863d.tar.gz
spark-383bf72c115b09d68cbde0d35ed89808ce04863d.tar.bz2
spark-383bf72c115b09d68cbde0d35ed89808ce04863d.zip
Cleanup on Connection, ConnectionManagerId, ConnectionManager classes part 2
Cleanup on Connection, ConnectionManagerId, and ConnectionManager classes part 2 while I was working at the code there to help IDE: 1. Remove unused imports 2. Remove parentheses in method calls that do not have side affect. 3. Add parentheses in method calls that do have side effect or not simple get to object properties. 4. Change if-else check (via isInstanceOf) for Connection class type with Scala expression for consistency and cleanliness. 5. Remove semicolon 6. Remove extra spaces. 7. Remove redundant return for consistency Author: Henry Saputra <henry.saputra@gmail.com> Closes #1157 from hsaputra/cleanup_connection_classes_part2 and squashes the following commits: 4be6906 [Henry Saputra] Fix Spark Scala style for line over 100 chars. 85b24f7 [Henry Saputra] Cleanup on Connection and ConnectionManager classes part 2 while I was working at the code there to help IDE: 1. Remove unused imports 2. Remove parentheses in method calls that do not have side affect. 3. Add parentheses in method calls that do have side effect. 4. Change if-else check (via isInstanceOf) for Connection class type with Scala expression for consitency and cleanliness. 5. Remove semicolon 6. Remove extra spaces.
-rw-r--r--core/src/main/scala/org/apache/spark/network/Connection.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala2
3 files changed, 62 insertions, 69 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 3b6298a26d..5285ec82c1 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -17,11 +17,6 @@
package org.apache.spark.network
-import org.apache.spark._
-import org.apache.spark.SparkSaslServer
-
-import scala.collection.mutable.{HashMap, Queue, ArrayBuffer}
-
import java.net._
import java.nio._
import java.nio.channels._
@@ -41,7 +36,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = {
this(channel_, selector_,
ConnectionManagerId.fromSocketAddress(
- channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]), id_)
+ channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]), id_)
}
channel.configureBlocking(false)
@@ -89,7 +84,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
private def disposeSasl() {
if (sparkSaslServer != null) {
- sparkSaslServer.dispose();
+ sparkSaslServer.dispose()
}
if (sparkSaslClient != null) {
@@ -328,15 +323,13 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
// Is highly unlikely unless there was an unclean close of socket, etc
registerInterest()
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
- true
} catch {
case e: Exception => {
logWarning("Error finishing connection to " + address, e)
callOnExceptionCallback(e)
- // ignore
- return true
}
}
+ true
}
override def write(): Boolean = {
@@ -546,7 +539,7 @@ private[spark] class ReceivingConnection(
/* println("Filled buffer at " + System.currentTimeMillis) */
val bufferMessage = inbox.getMessageForChunk(currentChunk).get
if (bufferMessage.isCompletelyReceived) {
- bufferMessage.flip
+ bufferMessage.flip()
bufferMessage.finishTime = System.currentTimeMillis
logDebug("Finished receiving [" + bufferMessage + "] from " +
"[" + getRemoteConnectionManagerId() + "] in " + bufferMessage.timeTaken)
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index cf1c985c2f..8a1cdb8129 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
def run() {
try {
while(!selectorThread.isInterrupted) {
- while (! registerRequests.isEmpty) {
+ while (!registerRequests.isEmpty) {
val conn: SendingConnection = registerRequests.dequeue()
addListeners(conn)
conn.connect()
@@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
// Some keys within the selectors list are invalid/closed. clear them.
val allKeys = selector.keys().iterator()
- while (allKeys.hasNext()) {
+ while (allKeys.hasNext) {
val key = allKeys.next()
try {
if (! key.isValid) {
@@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
if (0 != selectedKeysCount) {
val selectedKeys = selector.selectedKeys().iterator()
- while (selectedKeys.hasNext()) {
+ while (selectedKeys.hasNext) {
val key = selectedKeys.next
selectedKeys.remove()
try {
@@ -419,62 +419,63 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
connectionsByKey -= connection.key
try {
- if (connection.isInstanceOf[SendingConnection]) {
- val sendingConnection = connection.asInstanceOf[SendingConnection]
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
- logInfo("Removing SendingConnection to " + sendingConnectionManagerId)
-
- connectionsById -= sendingConnectionManagerId
- connectionsAwaitingSasl -= connection.connectionId
+ connection match {
+ case sendingConnection: SendingConnection =>
+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
+ logInfo("Removing SendingConnection to " + sendingConnectionManagerId)
+
+ connectionsById -= sendingConnectionManagerId
+ connectionsAwaitingSasl -= connection.connectionId
+
+ messageStatuses.synchronized {
+ messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId)
+ .foreach(status => {
+ logInfo("Notifying " + status)
+ status.synchronized {
+ status.attempted = true
+ status.acked = false
+ status.markDone()
+ }
+ })
- messageStatuses.synchronized {
- messageStatuses
- .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
- logInfo("Notifying " + status)
- status.synchronized {
- status.attempted = true
- status.acked = false
- status.markDone()
- }
+ messageStatuses.retain((i, status) => {
+ status.connectionManagerId != sendingConnectionManagerId
})
+ }
+ case receivingConnection: ReceivingConnection =>
+ val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
+ logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId)
- messageStatuses.retain((i, status) => {
- status.connectionManagerId != sendingConnectionManagerId
- })
- }
- } else if (connection.isInstanceOf[ReceivingConnection]) {
- val receivingConnection = connection.asInstanceOf[ReceivingConnection]
- val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
- logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId)
-
- val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
- if (! sendingConnectionOpt.isDefined) {
- logError("Corresponding SendingConnectionManagerId not found")
- return
- }
+ val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
+ if (!sendingConnectionOpt.isDefined) {
+ logError("Corresponding SendingConnectionManagerId not found")
+ return
+ }
- val sendingConnection = sendingConnectionOpt.get
- connectionsById -= remoteConnectionManagerId
- sendingConnection.close()
+ val sendingConnection = sendingConnectionOpt.get
+ connectionsById -= remoteConnectionManagerId
+ sendingConnection.close()
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
+ val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
- assert (sendingConnectionManagerId == remoteConnectionManagerId)
+ assert(sendingConnectionManagerId == remoteConnectionManagerId)
- messageStatuses.synchronized {
- for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
- logInfo("Notifying " + s)
- s.synchronized {
- s.attempted = true
- s.acked = false
- s.markDone()
+ messageStatuses.synchronized {
+ for (s <- messageStatuses.values
+ if s.connectionManagerId == sendingConnectionManagerId) {
+ logInfo("Notifying " + s)
+ s.synchronized {
+ s.attempted = true
+ s.acked = false
+ s.markDone()
+ }
}
- }
- messageStatuses.retain((i, status) => {
- status.connectionManagerId != sendingConnectionManagerId
- })
- }
+ messageStatuses.retain((i, status) => {
+ status.connectionManagerId != sendingConnectionManagerId
+ })
+ }
+ case _ => logError("Unsupported type of connection.")
}
} finally {
// So that the selection keys can be removed.
@@ -517,13 +518,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
logDebug("Client sasl completed for id: " + waitingConn.connectionId)
connectionsAwaitingSasl -= waitingConn.connectionId
waitingConn.getAuthenticated().synchronized {
- waitingConn.getAuthenticated().notifyAll();
+ waitingConn.getAuthenticated().notifyAll()
}
return
} else {
var replyToken : Array[Byte] = null
try {
- replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
+ replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken)
if (waitingConn.isSaslComplete()) {
logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId)
connectionsAwaitingSasl -= waitingConn.connectionId
@@ -533,7 +534,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
return
}
val securityMsgResp = SecurityMessage.fromResponse(replyToken,
- securityMsg.getConnectionId.toString())
+ securityMsg.getConnectionId.toString)
val message = securityMsgResp.toBufferMessage
if (message == null) throw new Exception("Error creating security message")
sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
@@ -630,13 +631,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
case bufferMessage: BufferMessage => {
if (authEnabled) {
val res = handleAuthentication(connection, bufferMessage)
- if (res == true) {
+ if (res) {
// message was security negotiation so skip the rest
logDebug("After handleAuth result was true, returning")
return
}
}
- if (bufferMessage.hasAckId) {
+ if (bufferMessage.hasAckId()) {
val sentMessageStatus = messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
@@ -646,7 +647,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
case None => {
throw new Exception("Could not find reference for received ack message " +
message.id)
- null
}
}
}
@@ -668,7 +668,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
if (ackMessage.isDefined) {
if (!ackMessage.get.isInstanceOf[BufferMessage]) {
logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
- + ackMessage.get.getClass())
+ + ackMessage.get.getClass)
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
logDebug("Response to " + bufferMessage + " does not have ack id set")
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
index b82edb6850..57f7586883 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
@@ -32,6 +32,6 @@ private[spark] case class ConnectionManagerId(host: String, port: Int) {
private[spark] object ConnectionManagerId {
def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = {
- new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort())
+ new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort)
}
}