aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <charles@eecs.berkeley.edu>2013-02-22 15:16:03 -0800
committerCharles Reiss <charles@eecs.berkeley.edu>2013-02-22 16:11:52 -0800
commitc8a788692185326c001233bb249d2ed046cd7319 (patch)
treea65947643b0fdac06263d3f58ca79938ff1fe49d /core
parent05bc02e80be78d83937bf57f726946e297d0dd08 (diff)
downloadspark-c8a788692185326c001233bb249d2ed046cd7319.tar.gz
spark-c8a788692185326c001233bb249d2ed046cd7319.tar.bz2
spark-c8a788692185326c001233bb249d2ed046cd7319.zip
Detect when SendingConnections drop by trying to read them.
Comment fix
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/network/Connection.scala24
1 files changed, 20 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index cd5b7d57f3..d1451bc212 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -198,7 +198,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
outbox.synchronized {
outbox.addMessage(message)
if (channel.isConnected) {
- changeConnectionKeyInterest(SelectionKey.OP_WRITE)
+ changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
}
}
}
@@ -219,7 +219,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
def finishConnect() {
try {
channel.finishConnect
- changeConnectionKeyInterest(SelectionKey.OP_WRITE)
+ changeConnectionKeyInterest(SelectionKey.OP_WRITE | SelectionKey.OP_READ)
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
} catch {
case e: Exception => {
@@ -239,8 +239,7 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
currentBuffers ++= chunk.buffers
}
case None => {
- changeConnectionKeyInterest(0)
- /*key.interestOps(0)*/
+ changeConnectionKeyInterest(SelectionKey.OP_READ)
return
}
}
@@ -267,6 +266,23 @@ extends Connection(SocketChannel.open, selector_, remoteId_) {
}
}
}
+
+ override def read() {
+ // We don't expect the other side to send anything; so, we just read to detect an error or EOF.
+ try {
+ val length = channel.read(ByteBuffer.allocate(1))
+ if (length == -1) { // EOF
+ close()
+ } else if (length > 0) {
+ logWarning("Unexpected data read from SendingConnection to " + remoteConnectionManagerId)
+ }
+ } catch {
+ case e: Exception =>
+ logError("Exception while reading SendingConnection to " + remoteConnectionManagerId, e)
+ callOnExceptionCallback(e)
+ close()
+ }
+ }
}