aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-01 00:05:32 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-01 00:05:32 +0530
commit538614acfe95b0c064679122af3bc990b669e4e0 (patch)
tree9b1232efdd3bdf211e88ca6d2303bae6d5563542 /core
parent48854e1dbf1d02e1e19f59d0aee0e281d41b3b45 (diff)
downloadspark-538614acfe95b0c064679122af3bc990b669e4e0.tar.gz
spark-538614acfe95b0c064679122af3bc990b669e4e0.tar.bz2
spark-538614acfe95b0c064679122af3bc990b669e4e0.zip
Be more aggressive and defensive in select also
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala83
1 files changed, 55 insertions, 28 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 2d9b4be4b3..9b00fddd40 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -254,7 +254,32 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
}
- val selectedKeysCount = selector.select()
+ val selectedKeysCount =
+ try {
+ selector.select()
+ } catch {
+ case e: CancelledKeyException => {
+ // Some keys within the selectors list are invalid/closed. clear them.
+ val allKeys = selector.keys().iterator()
+
+ while (allKeys.hasNext()) {
+ val key = allKeys.next()
+ try {
+ if (! key.isValid) {
+ logInfo("Key not valid ? " + key)
+ throw new CancelledKeyException()
+ }
+ } catch {
+ case e: CancelledKeyException => {
+ logInfo("key already cancelled ? " + key, e)
+ triggerForceCloseByException(key, e)
+ }
+ }
+ }
+ }
+ 0
+ }
+
if (selectedKeysCount == 0) {
logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
}
@@ -262,34 +287,36 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
logInfo("Selector thread was interrupted!")
return
}
-
- val selectedKeys = selector.selectedKeys().iterator()
- while (selectedKeys.hasNext()) {
- val key = selectedKeys.next
- selectedKeys.remove()
- try {
- if (key.isValid) {
- if (key.isAcceptable) {
- acceptConnection(key)
- } else
- if (key.isConnectable) {
- triggerConnect(key)
- } else
- if (key.isReadable) {
- triggerRead(key)
- } else
- if (key.isWritable) {
- triggerWrite(key)
+
+ if (0 != selectedKeysCount) {
+ val selectedKeys = selector.selectedKeys().iterator()
+ while (selectedKeys.hasNext()) {
+ val key = selectedKeys.next
+ selectedKeys.remove()
+ try {
+ if (key.isValid) {
+ if (key.isAcceptable) {
+ acceptConnection(key)
+ } else
+ if (key.isConnectable) {
+ triggerConnect(key)
+ } else
+ if (key.isReadable) {
+ triggerRead(key)
+ } else
+ if (key.isWritable) {
+ triggerWrite(key)
+ }
+ } else {
+ logInfo("Key not valid ? " + key)
+ throw new CancelledKeyException()
+ }
+ } catch {
+ // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
+ case e: CancelledKeyException => {
+ logInfo("key already cancelled ? " + key, e)
+ triggerForceCloseByException(key, e)
}
- } else {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
}
}
}