aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-08-08 22:28:40 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-08-08 22:28:40 +0530
commitc230ca3b4e99bf2a6c06b97723f47d5225003036 (patch)
treedf38156b6da783e094cc174a4e8333b24b984d54 /core
parentdc47084f4ee173fbd11e8e633ca7955c3259af88 (diff)
downloadspark-c230ca3b4e99bf2a6c06b97723f47d5225003036.tar.gz
spark-c230ca3b4e99bf2a6c06b97723f47d5225003036.tar.bz2
spark-c230ca3b4e99bf2a6c06b97723f47d5225003036.zip
Change line size
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/network/Connection.scala14
1 files changed, 9 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index ded045ee22..1e571d39ae 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -251,12 +251,15 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
}
+ // outbox is used as a lock - ensure that it is always used as a leaf (since methods which
+ // lock it are invoked in context of other locks)
private val outbox = new Outbox(1)
/*
- This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly different purpose.
- This flag is to see if we need to force reregister for write even when we do not have any pending bytes to write to socket.
- This can happen due to a race between adding pending buffers, and checking for existing of data as detailed in
- https://github.com/mesos/spark/pull/791#issuecomment-22294729
+ This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
+ different purpose. This flag is to see if we need to force reregister for write even when we
+ do not have any pending bytes to write to socket.
+ This can happen due to a race between adding pending buffers, and checking for existing of
+ data as detailed in https://github.com/mesos/spark/pull/791
*/
private var needForceReregister = false
val currentBuffers = new ArrayBuffer[ByteBuffer]()
@@ -346,7 +349,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
outbox.getChunk() match {
case Some(chunk) => {
val buffers = chunk.buffers
- // If we have 'seen' pending messages, then reset flag - since we handle that as normal registering of event (below)
+ // If we have 'seen' pending messages, then reset flag - since we handle that as normal
+ // registering of event (below)
if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
currentBuffers ++= buffers
}