diff options
author | Charles Reiss <charles@eecs.berkeley.edu> | 2013-02-22 15:16:03 -0800 |
---|---|---|
committer | Charles Reiss <charles@eecs.berkeley.edu> | 2013-02-22 16:11:52 -0800 |
commit | c8a788692185326c001233bb249d2ed046cd7319 (patch) | |
tree | a65947643b0fdac06263d3f58ca79938ff1fe49d /core/src | |
parent | 05bc02e80be78d83937bf57f726946e297d0dd08 (diff) | |
download | spark-c8a788692185326c001233bb249d2ed046cd7319.tar.gz spark-c8a788692185326c001233bb249d2ed046cd7319.tar.bz2 spark-c8a788692185326c001233bb249d2ed046cd7319.zip |
Detect when SendingConnections drop by trying to read them.
Comment fix
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/network/Connection.scala | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index cd5b7d57f3..d1451bc212 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { outbox.synchronized { outbox.addMessage(message) if (channel.isConnected) { - changeConnectionKeyInterest(SelectionKey.OP_WRITE) + changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ) } } } @@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { def finishConnect() { try { channel.finishConnect - changeConnectionKeyInterest(SelectionKey.OP_WRITE) + changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ) logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") } catch { case e: Exception => { @@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { currentBuffers ++= chunk.buffers } case None => { - changeConnectionKeyInterest(0) - /*key.interestOps(0)*/ + changeConnectionKeyInterest(SelectionKey.OP_READ) return } } @@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) { } } } + + override def read() { + // We don't expect the other side to send anything; so, we just read to detect an error or EOF. + try { + val length = channel.read(ByteBuffer.allocate(1)) + if (length == -1) { // EOF + close() + } else if (length > 0) { + logWarning("Unexpected data read from SendingConnection to " + remoteConnectionManagerId) + } + } catch { + case e: Exception => + logError("Exception while reading SendingConnection to " + remoteConnectionManagerId, e) + callOnExceptionCallback(e) + close() + } + } } |