aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-08-08 22:19:27 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-08-08 22:19:27 +0530
commitdc47084f4ee173fbd11e8e633ca7955c3259af88 (patch)
tree4c234210fc7dbd3505bd9353b746a2c33a654b0d /core
parent5133e4bebd47d8ae089f967689ecab551c2c5844 (diff)
downloadspark-dc47084f4ee173fbd11e8e633ca7955c3259af88.tar.gz
spark-dc47084f4ee173fbd11e8e633ca7955c3259af88.tar.bz2
spark-dc47084f4ee173fbd11e8e633ca7955c3259af88.zip
Attempt to fix bug reported in PR 791 : a race condition in ConnectionManager and Connection
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/network/Connection.scala42
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala3
2 files changed, 38 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index b66c00b58c..ded045ee22 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -45,12 +45,15 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
channel.socket.setKeepAlive(true)
/*channel.socket.setReceiveBufferSize(32768) */
+ @volatile private var closed = false
var onCloseCallback: Connection => Unit = null
var onExceptionCallback: (Connection, Exception) => Unit = null
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
val remoteAddress = getRemoteAddress()
+ def resetForceReregister(): Boolean
+
// Read channels typically do not register for write and write does not for read
// Now, we do have write registering for read too (temporarily), but this is to detect
// channel close NOT to actually read/consume data on it !
@@ -95,6 +98,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
}
def close() {
+ closed = true
val k = key()
if (k != null) {
k.cancel()
@@ -103,6 +107,8 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
callOnCloseCallback()
}
+ protected def isClosed: Boolean = closed
+
def onClose(callback: Connection => Unit) {
onCloseCallback = callback
}
@@ -168,7 +174,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
remoteId_ : ConnectionManagerId)
extends Connection(SocketChannel.open, selector_, remoteId_) {
- class Outbox(fair: Int = 0) {
+ private class Outbox(fair: Int = 0) {
val messages = new Queue[Message]()
val defaultChunkSize = 65536 //32768 //16384
var nextMessageToBeUsed = 0
@@ -246,6 +252,13 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
private val outbox = new Outbox(1)
+ /*
+ This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose.
+ This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket.
+ This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in
+ https://github.com/mesos/spark/pull/791#issuecomment-22294729
+ */
+ private var needForceReregister = false
val currentBuffers = new ArrayBuffer[ByteBuffer]()
/*channel.socket.setSendBufferSize(256 * 1024)*/
@@ -267,9 +280,19 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
def send(message: Message) {
outbox.synchronized {
outbox.addMessage(message)
- if (channel.isConnected) {
- registerInterest()
- }
+ needForceReregister = true
+ }
+ if (channel.isConnected) {
+ registerInterest()
+ }
+ }
+
+ // return previous value after resetting it.
+ def resetForceReregister(): Boolean = {
+ outbox.synchronized {
+ val result = needForceReregister
+ needForceReregister = false
+ result
}
}
@@ -322,7 +345,10 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
outbox.synchronized {
outbox.getChunk() match {
case Some(chunk) => {
- currentBuffers ++= chunk.buffers
+ val buffers = chunk.buffers
+ // If we have 'seen' pending messages, then reset flag - since we handle that as normal registering of event (below)
+ if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
+ currentBuffers ++= buffers
}
case None => {
// changeConnectionKeyInterest(0)
@@ -384,7 +410,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
override def changeInterestForRead(): Boolean = false
- override def changeInterestForWrite(): Boolean = true
+ override def changeInterestForWrite(): Boolean = ! isClosed
}
@@ -534,6 +560,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
+ // override def changeInterestForRead(): Boolean = ! isClosed
override def changeInterestForRead(): Boolean = true
override def changeInterestForWrite(): Boolean = {
@@ -549,4 +576,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
override def unregisterInterest() {
changeConnectionKeyInterest(0)
}
+
+ // For read conn, always false.
+ override def resetForceReregister(): Boolean = false
}
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 6c4e7dc03e..8b9f3ae18c 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -123,7 +123,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
} finally {
writeRunnableStarted.synchronized {
writeRunnableStarted -= key
- if (register && conn.changeInterestForWrite()) {
+ val needReregister = register || conn.resetForceReregister()
+ if (needReregister && conn.changeInterestForWrite()) {
conn.registerInterest()
}
}