diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-10 10:21:20 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-08-10 10:21:20 -0700 |
commit | 71c63de22fea19fcd3f6a847dfd3d7b6ab597eac (patch) | |
tree | 4dc441892a3e6f6ed8d0f692b537c209386ff69f | |
parent | d3277a0daf300ce2bb074e1a779300c1d811bfc5 (diff) | |
parent | c230ca3b4e99bf2a6c06b97723f47d5225003036 (diff) | |
download | spark-71c63de22fea19fcd3f6a847dfd3d7b6ab597eac.tar.gz spark-71c63de22fea19fcd3f6a847dfd3d7b6ab597eac.tar.bz2 spark-71c63de22fea19fcd3f6a847dfd3d7b6ab597eac.zip |
Merge pull request #795 from mridulm/master
Fix bug reported in PR 791 : a race condition in ConnectionManager and Connection
-rw-r--r-- | core/src/main/scala/spark/network/Connection.scala | 46 | ||||
-rw-r--r-- | core/src/main/scala/spark/network/ConnectionManager.scala | 3 |
2 files changed, 42 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index b66c00b58c..1e571d39ae 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -45,12 +45,15 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, channel.socket.setKeepAlive(true) /*channel.socket.setReceiveBufferSize(32768) */ + @volatile private var closed = false var onCloseCallback: Connection => Unit = null var onExceptionCallback: (Connection, Exception) => Unit = null var onKeyInterestChangeCallback: (Connection, Int) => Unit = null val remoteAddress = getRemoteAddress() + def resetForceReregister(): Boolean + // Read channels typically do not register for write and write does not for read // Now, we do have write registering for read too (temporarily), but this is to detect // channel close NOT to actually read/consume data on it ! @@ -95,6 +98,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, } def close() { + closed = true val k = key() if (k != null) { k.cancel() @@ -103,6 +107,8 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, callOnCloseCallback() } + protected def isClosed: Boolean = closed + def onClose(callback: Connection => Unit) { onCloseCallback = callback } @@ -168,7 +174,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, remoteId_ : ConnectionManagerId) extends Connection(SocketChannel.open, selector_, remoteId_) { - class Outbox(fair: Int = 0) { + private class Outbox(fair: Int = 0) { val messages = new Queue[Message]() val defaultChunkSize = 65536 //32768 //16384 var nextMessageToBeUsed = 0 @@ -245,7 +251,17 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } + // outbox is used as a lock - ensure that it is always used as a leaf (since methods which + // lock it are invoked in context of other locks) private val outbox = new Outbox(1) + /* + This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly + different purpose. This flag is to see if we need to force reregister for write even when we + do not have any pending bytes to write to socket. + This can happen due to a race between adding pending buffers, and checking for existing of + data as detailed in https://github.com/mesos/spark/pull/791 + */ + private var needForceReregister = false val currentBuffers = new ArrayBuffer[ByteBuffer]() /*channel.socket.setSendBufferSize(256 * 1024)*/ @@ -267,9 +283,19 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, def send(message: Message) { outbox.synchronized { outbox.addMessage(message) - if (channel.isConnected) { - registerInterest() - } + needForceReregister = true + } + if (channel.isConnected) { + registerInterest() + } + } + + // return previous value after resetting it. + def resetForceReregister(): Boolean = { + outbox.synchronized { + val result = needForceReregister + needForceReregister = false + result } } @@ -322,7 +348,11 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, outbox.synchronized { outbox.getChunk() match { case Some(chunk) => { - currentBuffers ++= chunk.buffers + val buffers = chunk.buffers + // If we have 'seen' pending messages, then reset flag - since we handle that as normal + // registering of event (below) + if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister() + currentBuffers ++= buffers } case None => { // changeConnectionKeyInterest(0) @@ -384,7 +414,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, override def changeInterestForRead(): Boolean = false - override def changeInterestForWrite(): Boolean = true + override def changeInterestForWrite(): Boolean = ! isClosed } @@ -534,6 +564,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback} + // override def changeInterestForRead(): Boolean = ! isClosed override def changeInterestForRead(): Boolean = true override def changeInterestForWrite(): Boolean = { @@ -549,4 +580,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S override def unregisterInterest() { changeConnectionKeyInterest(0) } + + // For read conn, always false. + override def resetForceReregister(): Boolean = false } diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 6c4e7dc03e..8b9f3ae18c 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -123,7 +123,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { } finally { writeRunnableStarted.synchronized { writeRunnableStarted -= key - if (register && conn.changeInterestForWrite()) { + val needReregister = register || conn.resetForceReregister() + if (needReregister && conn.changeInterestForWrite()) { conn.registerInterest() } } |