diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-26 21:16:37 -0700 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-10-26 21:16:37 -0700 |
commit | 8587353b3e5dc7190fcb64dd162b6183da1208c2 (patch) | |
tree | 3387677fa5fdb605b97850ed2f297a0dae36fade | |
parent | d70f3549ff166e0f7da3d63ff3ce195f13c9f7b5 (diff) | |
download | spark-8587353b3e5dc7190fcb64dd162b6183da1208c2.tar.gz spark-8587353b3e5dc7190fcb64dd162b6183da1208c2.tar.bz2 spark-8587353b3e5dc7190fcb64dd162b6183da1208c2.zip |
Added some tuning parameters.
-rw-r--r-- | conf/java-opts | 2 | ||||
-rw-r--r-- | src/scala/spark/Broadcast.scala | 113 |
2 files changed, 72 insertions, 43 deletions
diff --git a/conf/java-opts b/conf/java-opts index c7fa245f47..f279d27806 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=256 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 +-Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=11111 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.ServerSocketTimout=50000 diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 83849569b4..45a3685c7c 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -2,7 +2,7 @@ package spark import java.io._ import java.net._ -import java.util.{BitSet, Comparator, Timer, TimerTask, UUID} +import java.util.{BitSet, Comparator, Random, Timer, TimerTask, UUID} import com.google.common.collect.MapMaker @@ -77,7 +77,7 @@ extends BroadcastRecipe with Logging { hasCopyInHDFS = true // Create a variableInfo object and store it in valueInfos - var variableInfo = blockifyObject (value_, BroadcastBT.blockSize) + var variableInfo = blockifyObject (value_, BroadcastBT.BlockSize) // Prepare the value being broadcasted // TODO: Refactoring and clean-up required here @@ -227,7 +227,7 @@ extends BroadcastRecipe with Logging { var retByteArray = new Array[Byte] (totalBytes) for (i <- 0 until totalBlocks) { System.arraycopy (arrayOfBlocks(i).byteArray, 0, retByteArray, - i * BroadcastBT.blockSize, arrayOfBlocks(i).byteArray.length) + i * BroadcastBT.BlockSize, arrayOfBlocks(i).byteArray.length) } byteArrayToObject (retByteArray) } @@ -306,9 +306,9 @@ extends BroadcastRecipe with Logging { oosGuide.close clientSocketToGuide.close - // TODO: DO NOT use constant sleep value here - // TODO: Guide should send back a backoff time, somehow - Thread.sleep (100) + Thread.sleep ( BroadcastBT.ranGen.nextInt ( + BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) + + BroadcastBT.MinKnockInterval) } } } @@ -321,12 +321,12 @@ extends BroadcastRecipe with Logging { var gInfo: SourceInfo = SourceInfo ("", SourceInfo.TxOverGoToHDFS, SourceInfo.UnusedParam, SourceInfo.UnusedParam) - var retriesLeft = BroadcastBT.maxRetryCount + var retriesLeft = BroadcastBT.MaxRetryCount do { try { // Connect to the tracker to find out GuideInfo val clientSocketToTracker = - new Socket(BroadcastBT.masterHostAddress, BroadcastBT.masterTrackerPort) + new Socket(BroadcastBT.MasterHostAddress, BroadcastBT.MasterTrackerPort) val oosTracker = new ObjectOutputStream (clientSocketToTracker.getOutputStream) oosTracker.flush @@ -350,9 +350,12 @@ extends BroadcastRecipe with Logging { clientSocketToTracker.close } } - retriesLeft -= 1 - // TODO: Should wait before retrying. Implement wait function. - Thread.sleep (1234) + + Thread.sleep ( BroadcastBT.ranGen.nextInt ( + BroadcastBT.MaxKnockInterval - BroadcastBT.MinKnockInterval) + + BroadcastBT.MinKnockInterval) + + retriesLeft -= 1 } while (retriesLeft > 0 && gInfo.listenPort == SourceInfo.TxNotStartedRetry) logInfo ("Got this guidePort from Tracker: " + gInfo.listenPort) @@ -596,7 +599,7 @@ extends BroadcastRecipe with Logging { while (keepAccepting || !hasCopyInHDFS) { var clientSocket: Socket = null try { - serverSocket.setSoTimeout (BroadcastBT.serverSocketTimout) + serverSocket.setSoTimeout (BroadcastBT.ServerSocketTimout) clientSocket = serverSocket.accept } catch { case e: Exception => { @@ -711,7 +714,7 @@ extends BroadcastRecipe with Logging { while (keepAccepting) { var clientSocket: Socket = null try { - serverSocket.setSoTimeout (BroadcastBT.serverSocketTimout) + serverSocket.setSoTimeout (BroadcastBT.ServerSocketTimout) clientSocket = serverSocket.accept } catch { case e: Exception => { @@ -740,9 +743,6 @@ extends BroadcastRecipe with Logging { class ServeSingleRequest (val clientSocket: Socket) extends Thread with Logging { - // TODO: This has to be fixed somehow - private val MAX_MILLIS_TO_CHAT = 50 - private val oos = new ObjectOutputStream (clientSocket.getOutputStream) oos.flush private val ois = new ObjectInputStream (clientSocket.getInputStream) @@ -775,14 +775,17 @@ extends BroadcastRecipe with Logging { val startTime = System.currentTimeMillis var curTime = startTime var keepSending = true + var blocksToSend = BroadcastBT.MaxChatBlocks - while (keepSending && curTime - startTime < MAX_MILLIS_TO_CHAT) { + while (keepSending && blocksToSend > 0 && + (curTime - startTime) < BroadcastBT.MaxChatTime) { val sentBlock = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector) if (sentBlock < 0) { keepSending = false } else { rxSourceInfo.hasBlocksBitVector.set (sentBlock) } + blocksToSend = blocksToSend - 1 curTime = System.currentTimeMillis } } catch { @@ -808,8 +811,6 @@ extends BroadcastRecipe with Logging { var minCopies = Int.MaxValue var nextIndex = -1 - logInfo ("Picking a block to send...") - // Figure out which blocks the receiver doesn't have var tempHasBlocksBitVector = rxHasBlocksBitVector.clone.asInstanceOf[BitSet] @@ -945,16 +946,19 @@ extends Logging { var valueToGuideMap = Map[UUID, SourceInfo] () - var sourceToSpeedMap = Map[String, Double] () +// var sourceToSpeedMap = Map[String, Double] () + + // Random number generator + var ranGen = new Random private var initialized = false private var isMaster_ = false - private var masterHostAddress_ = "127.0.0.1" - private var masterTrackerPort_ : Int = 11111 - private var blockSize_ : Int = 512 * 1024 - private var maxRetryCount_ : Int = 2 - private var serverSocketTimout_ : Int = 50000 + private var MasterHostAddress_ = "127.0.0.1" + private var MasterTrackerPort_ : Int = 11111 + private var BlockSize_ : Int = 512 * 1024 + private var MaxRetryCount_ : Int = 2 + private var ServerSocketTimout_ : Int = 50000 private var trackMV: TrackMultipleValues = null @@ -963,24 +967,38 @@ extends Logging { // 125.0 MBps = 1 Gbps link private val MaxMBps_ = 125.0 + // A peer syncs back to Guide after waiting randomly within following limits + private var MinKnockInterval_ = 500 + private var MaxKnockInterval_ = 999 + private var MaxPeersInGuideResponse_ = 4 + // Maximum number of receiving and sending threads of a peer private var MaxRxPeers_ = 4 private var MaxTxPeers_ = 4 + + // Peers can char at most this milliseconds or transfer this number of blocks + private var MaxChatTime_ = 250 + private var MaxChatBlocks_ = 1024 def initialize (isMaster__ : Boolean) { synchronized { if (!initialized) { - masterHostAddress_ = - System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1") - masterTrackerPort_ = - System.getProperty ("spark.broadcast.masterTrackerPort", "11111").toInt - blockSize_ = - System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024 - maxRetryCount_ = - System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt - serverSocketTimout_ = - System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt + MasterHostAddress_ = + System.getProperty ("spark.broadcast.MasterHostAddress", "127.0.0.1") + MasterTrackerPort_ = + System.getProperty ("spark.broadcast.MasterTrackerPort", "11111").toInt + BlockSize_ = + System.getProperty ("spark.broadcast.BlockSize", "512").toInt * 1024 + MaxRetryCount_ = + System.getProperty ("spark.broadcast.MaxRetryCount", "2").toInt + ServerSocketTimout_ = + System.getProperty ("spark.broadcast.ServerSocketTimout", "50000").toInt + + MinKnockInterval_ = + System.getProperty ("spark.broadcast.MinKnockInterval", "100").toInt + MaxKnockInterval_ = + System.getProperty ("spark.broadcast.MaxKnockInterval", "999").toInt MaxPeersInGuideResponse_ = System.getProperty ("spark.broadcast.MaxPeersInGuideResponse", "4").toInt @@ -990,6 +1008,11 @@ extends Logging { MaxTxPeers_ = System.getProperty ("spark.broadcast.MaxTxPeers", "4").toInt + MaxChatTime_ = + System.getProperty ("spark.broadcast.MaxChatTime", "4").toInt + MaxChatBlocks_ = + System.getProperty ("spark.broadcast.MaxChatBlocks", "4").toInt + isMaster_ = isMaster__ if (isMaster) { @@ -1004,21 +1027,27 @@ extends Logging { } } - def masterHostAddress = masterHostAddress_ - def masterTrackerPort = masterTrackerPort_ - def blockSize = blockSize_ - def maxRetryCount = maxRetryCount_ - def serverSocketTimout = serverSocketTimout_ + def MasterHostAddress = MasterHostAddress_ + def MasterTrackerPort = MasterTrackerPort_ + def BlockSize = BlockSize_ + def MaxRetryCount = MaxRetryCount_ + def ServerSocketTimout = ServerSocketTimout_ def isMaster = isMaster_ def MaxMBps = MaxMBps_ + def MinKnockInterval = MinKnockInterval_ + def MaxKnockInterval = MaxKnockInterval_ + def MaxPeersInGuideResponse = MaxPeersInGuideResponse_ def MaxRxPeers = MaxRxPeers_ def MaxTxPeers = MaxTxPeers_ + def MaxChatTime = MaxChatTime_ + def MaxChatBlocks = MaxChatBlocks_ + def registerValue (uuid: UUID, gInfo: SourceInfo) = { valueToGuideMap.synchronized { valueToGuideMap += (uuid -> gInfo) @@ -1059,7 +1088,7 @@ extends Logging { var threadPool = Executors.newCachedThreadPool var serverSocket: ServerSocket = null - serverSocket = new ServerSocket (BroadcastBT.masterTrackerPort) + serverSocket = new ServerSocket (BroadcastBT.MasterTrackerPort) logInfo ("TrackMultipleValues" + serverSocket) try { @@ -1067,7 +1096,7 @@ extends Logging { var clientSocket: Socket = null try { // TODO: - serverSocket.setSoTimeout (serverSocketTimout) + serverSocket.setSoTimeout (ServerSocketTimout) clientSocket = serverSocket.accept } catch { case e: Exception => { |