diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-05-01 00:05:32 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-05-01 00:05:32 +0530 |
commit | 538614acfe95b0c064679122af3bc990b669e4e0 (patch) | |
tree | 9b1232efdd3bdf211e88ca6d2303bae6d5563542 /core | |
parent | 48854e1dbf1d02e1e19f59d0aee0e281d41b3b45 (diff) | |
download | spark-538614acfe95b0c064679122af3bc990b669e4e0.tar.gz spark-538614acfe95b0c064679122af3bc990b669e4e0.tar.bz2 spark-538614acfe95b0c064679122af3bc990b669e4e0.zip |
Be more aggressive and defensive in select also
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/network/ConnectionManager.scala | 83 |
1 files changed, 55 insertions, 28 deletions
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 2d9b4be4b3..9b00fddd40 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -254,7 +254,32 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } } - val selectedKeysCount = selector.select() + val selectedKeysCount = + try { + selector.select() + } catch { + case e: CancelledKeyException => { + // Some keys within the selectors list are invalid/closed. clear them. + val allKeys = selector.keys().iterator() + + while (allKeys.hasNext()) { + val key = allKeys.next() + try { + if (! key.isValid) { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) + } + } + } + } + 0 + } + if (selectedKeysCount == 0) { logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys") } @@ -262,34 +287,36 @@ private[spark] class ConnectionManager(port: Int) extends Logging { logInfo("Selector thread was interrupted!") return } - - val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext()) { - val key = selectedKeys.next - selectedKeys.remove() - try { - if (key.isValid) { - if (key.isAcceptable) { - acceptConnection(key) - } else - if (key.isConnectable) { - triggerConnect(key) - } else - if (key.isReadable) { - triggerRead(key) - } else - if (key.isWritable) { - triggerWrite(key) + + if (0 != selectedKeysCount) { + val selectedKeys = selector.selectedKeys().iterator() + while (selectedKeys.hasNext()) { + val key = selectedKeys.next + selectedKeys.remove() + try { + if (key.isValid) { + if (key.isAcceptable) { + acceptConnection(key) + } else + if (key.isConnectable) { + triggerConnect(key) + } else + if (key.isReadable) { + triggerRead(key) + } else + if (key.isWritable) { + triggerWrite(key) + } + } else { + logInfo("Key not valid ? " + key) + throw new CancelledKeyException() + } + } catch { + // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. + case e: CancelledKeyException => { + logInfo("key already cancelled ? " + key, e) + triggerForceCloseByException(key, e) } - } else { - logInfo("Key not valid ? " + key) - throw new CancelledKeyException() - } - } catch { - // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException. - case e: CancelledKeyException => { - logInfo("key already cancelled ? " + key, e) - triggerForceCloseByException(key, e) } } } |