From dc47084f4ee173fbd11e8e633ca7955c3259af88 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Thu, 8 Aug 2013 22:19:27 +0530 Subject: Attempt to fix bug reported in PR 791 : a race condition in ConnectionManager and Connection --- core/src/main/scala/spark/network/Connection.scala | 42 ++++++++++++++++++---- .../scala/spark/network/ConnectionManager.scala | 3 +- 2 files changed, 38 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index b66c00b58c..ded045ee22 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -45,12 +45,15 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, channel.socket.setKeepAlive(true) /*channel.socket.setReceiveBufferSize(32768) */ + @volatile private var closed = false var onCloseCallback: Connection => Unit = null var onExceptionCallback: (Connection, Exception) => Unit = null var onKeyInterestChangeCallback: (Connection, Int) => Unit = null val remoteAddress = getRemoteAddress() + def resetForceReregister(): Boolean + // Read channels typically do not register for write and write does not for read // Now, we do have write registering for read too (temporarily), but this is to detect // channel close NOT to actually read/consume data on it ! @@ -95,6 +98,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, } def close() { + closed = true val k = key() if (k != null) { k.cancel() @@ -103,6 +107,8 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, callOnCloseCallback() } + protected def isClosed: Boolean = closed + def onClose(callback: Connection => Unit) { onCloseCallback = callback } @@ -168,7 +174,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, remoteId_ : ConnectionManagerId) extends Connection(SocketChannel.open, selector_, remoteId_) { - class Outbox(fair: Int = 0) { + private class Outbox(fair: Int = 0) { val messages = new Queue[Message]() val defaultChunkSize = 65536 //32768 //16384 var nextMessageToBeUsed = 0 @@ -246,6 +252,13 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } private val outbox = new Outbox(1) + /* + This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose. + This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket. + This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in + https://github.com/mesos/spark/pull/791#issuecomment-22294729 + */ + private var needForceReregister = false val currentBuffers = new ArrayBuffer[ByteBuffer]() /*channel.socket.setSendBufferSize(256 * 1024)*/ @@ -267,9 +280,19 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, def send(message: Message) { outbox.synchronized { outbox.addMessage(message) - if (channel.isConnected) { - registerInterest() - } + needForceReregister = true + } + if (channel.isConnected) { + registerInterest() + } + } + + // return previous value after resetting it. + def resetForceReregister(): Boolean = { + outbox.synchronized { + val result = needForceReregister + needForceReregister = false + result } } @@ -322,7 +345,10 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, outbox.synchronized { outbox.getChunk() match { case Some(chunk) => { - currentBuffers ++= chunk.buffers + val buffers = chunk.buffers + // If we have 'seen' pending messages, then reset flag - since we handle that as normal registering of event (below) + if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister() + currentBuffers ++= buffers } case None => { // changeConnectionKeyInterest(0) @@ -384,7 +410,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, override def changeInterestForRead(): Boolean = false - override def changeInterestForWrite(): Boolean = true + override def changeInterestForWrite(): Boolean = ! isClosed } @@ -534,6 +560,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback} + // override def changeInterestForRead(): Boolean = ! isClosed override def changeInterestForRead(): Boolean = true override def changeInterestForWrite(): Boolean = { @@ -549,4 +576,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S override def unregisterInterest() { changeConnectionKeyInterest(0) } + + // For read conn, always false. + override def resetForceReregister(): Boolean = false } diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 6c4e7dc03e..8b9f3ae18c 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -123,7 +123,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } finally { writeRunnableStarted.synchronized { writeRunnableStarted -= key - if (register && conn.changeInterestForWrite()) { + val needReregister = register || conn.resetForceReregister() + if (needReregister && conn.changeInterestForWrite()) { conn.registerInterest() } } -- cgit v1.2.3