aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-01 00:30:30 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-01 00:30:30 +0530
commit3b748ced2258246bd9b7c250363645cea27cf622 (patch)
treed467a4ec1da0b5789e9915ab93be572b0cbcec5b /core/src/main
parent0f45477be16254971763cbc07feac7460cffd0bd (diff)
downloadspark-3b748ced2258246bd9b7c250363645cea27cf622.tar.gz
spark-3b748ced2258246bd9b7c250363645cea27cf622.tar.bz2
spark-3b748ced2258246bd9b7c250363645cea27cf622.zip
Be more aggressive and defensive in all uses of SelectionKey in select loop
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala47
1 files changed, 30 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 925d076951..03926a6038 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -232,24 +232,37 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
while(!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue
- val connection = connectionsByKey.getOrElse(key, null)
- if (connection != null) {
- val lastOps = key.interestOps()
- key.interestOps(ops)
-
- // hot loop - prevent materialization of string if trace not enabled.
- if (isTraceEnabled()) {
- def intToOpStr(op: Int): String = {
- val opStrs = ArrayBuffer[String]()
- if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
- if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
- if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
- if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
- if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
- }
- logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
- "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+ try {
+ if (key.isValid) {
+ val connection = connectionsByKey.getOrElse(key, null)
+ if (connection != null) {
+ val lastOps = key.interestOps()
+ key.interestOps(ops)
+
+ // hot loop - prevent materialization of string if trace not enabled.
+ if (isTraceEnabled()) {
+ def intToOpStr(op: Int): String = {
+ val opStrs = ArrayBuffer[String]()
+ if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
+ if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
+ if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
+ if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
+ if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
+ }
+
+ logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
+ "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+ }
+ }
+ } else {
+ logInfo("Key not valid ? " + key)
+ throw new CancelledKeyException()
+ }
+ } catch {
+ case e: CancelledKeyException => {
+ logInfo("key already cancelled ? " + key, e)
+ triggerForceCloseByException(key, e)
}
}
}