aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/network/Connection.scala36
1 files changed, 2 insertions, 34 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index f2e3c1a14e..8219a185ea 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -171,7 +171,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
remoteId_ : ConnectionManagerId)
extends Connection(SocketChannel.open, selector_, remoteId_) {
- private class Outbox(fair: Int = 0) {
+ private class Outbox {
val messages = new Queue[Message]()
val defaultChunkSize = 65536 //32768 //16384
var nextMessageToBeUsed = 0
@@ -186,38 +186,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
def getChunk(): Option[MessageChunk] = {
- fair match {
- case 0 => getChunkFIFO()
- case 1 => getChunkRR()
- case _ => throw new Exception("Unexpected fairness policy in outbox")
- }
- }
-
- private def getChunkFIFO(): Option[MessageChunk] = {
- /*logInfo("Using FIFO")*/
- messages.synchronized {
- while (!messages.isEmpty) {
- val message = messages(0)
- val chunk = message.getChunkForSending(defaultChunkSize)
- if (chunk.isDefined) {
- messages += message // this is probably incorrect, it wont work as fifo
- if (!message.started) {
- logDebug("Starting to send [" + message + "]")
- message.started = true
- message.startTime = System.currentTimeMillis
- }
- return chunk
- } else {
- message.finishTime = System.currentTimeMillis
- logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
- "] in " + message.timeTaken )
- }
- }
- }
- None
- }
-
- private def getChunkRR(): Option[MessageChunk] = {
messages.synchronized {
while (!messages.isEmpty) {
/*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
@@ -249,7 +217,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
// outbox is used as a lock - ensure that it is always used as a leaf (since methods which
// lock it are invoked in context of other locks)
- private val outbox = new Outbox(1)
+ private val outbox = new Outbox()
/*
This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
different purpose. This flag is to see if we need to force reregister for write even when we