diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-29 11:20:04 -0800 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-29 11:20:04 -0800 |
commit | 8d775448c221e490dc369a1c23eeda629140c419 (patch) | |
tree | 4fb6a919d4dd12e14a781708d4be254f13f492f8 | |
parent | 830496b901d0abd99f6413015e8ad1d70efb724e (diff) | |
download | spark-8d775448c221e490dc369a1c23eeda629140c419.tar.gz spark-8d775448c221e490dc369a1c23eeda629140c419.tar.bz2 spark-8d775448c221e490dc369a1c23eeda629140c419.zip |
- Resolved some of the simpler TODOs related to different timeout and wait periods.
- Removed unused code.
-rw-r--r-- | src/scala/spark/Broadcast.scala | 80 |
1 files changed, 14 insertions, 66 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 432fc5bb4c..9e6355782f 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -435,7 +435,7 @@ extends BroadcastRecipe with Logging { // TODO: Must fix this. This might never break if broadcast fails. // We should be able to break and send false. Also need to kill threads while (hasBlocks < totalBlocks) { - Thread.sleep(1000) + Thread.sleep(BroadcastBT.MaxKnockInterval) } return true @@ -473,14 +473,13 @@ extends BroadcastRecipe with Logging { } // Sleep for a while before starting some more threads - // TODO: Whats up with this? - Thread.sleep (500) + Thread.sleep (BroadcastBT.MinKnockInterval) } // Shutdown the thread pool threadPool.shutdown } - // TODO: Right now picking the one that has the most blocks this peer wants + // Right now picking the one that has the most blocks this peer wants // Also picking peer randomly if no one has anything interesting private def pickPeerToTalkTo: SourceInfo = { var curPeer: SourceInfo = null @@ -509,7 +508,7 @@ extends BroadcastRecipe with Logging { } // Always pick randomly or randomly pick randomly? - // TODO: Now its always + // Now always picking randomly if (curPeer == null && peersNotInUse.size > 0) { // Pick uniformly the i'th required peer var i = BroadcastBT.ranGen.nextInt (peersNotInUse.size) @@ -548,9 +547,8 @@ extends BroadcastRecipe with Logging { } } - // TODO: Fix a value for timeout timer var timeOutTimer = new Timer - timeOutTimer.schedule (timeOutTask, 1 * 1000) + timeOutTimer.schedule (timeOutTask, BroadcastBT.MaxKnockInterval) logInfo ("TalkToPeer started... => " + peerToTalkTo) @@ -638,13 +636,13 @@ extends BroadcastRecipe with Logging { case eofe: java.io.EOFException => { } case e: Exception => { logInfo ("TalktoPeer had a " + e) - // Remove this pInfo from listOfSources - // TODO: We probably should have the following in some form, but not + // TODO: Remove 'newPeerToTalkTo' from listOfSources + // We probably should have the following in some form, but not // really here. This exception can happen if the sender just breaks connection -// listOfSources.synchronized { -// logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo) -// listOfSources = listOfSources - peerToTalkTo -// } + // listOfSources.synchronized { + // logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo) + // listOfSources = listOfSources - peerToTalkTo + // } } } finally { // blockToAskFor != -1 => there was an exception @@ -918,8 +916,7 @@ extends BroadcastRecipe with Logging { class ServeMultipleRequests extends Thread with Logging { override def run: Unit = { - // TODO: Not sure if this will be able to fix the number of outgoing links - // We should have a timeout mechanism on the receiver side + // Server at most BroadcastBT.MaxRxPeers peers var threadPool = BroadcastBT.newDaemonFixedThreadPool(BroadcastBT.MaxRxPeers) @@ -1006,13 +1003,6 @@ extends BroadcastRecipe with Logging { sendBlock (blockToSend) rxSourceInfo.hasBlocksBitVector.set (blockToSend) -// val sentBlock = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector) -// if (sentBlock < 0) { -// keepSending = false -// } else { -// rxSourceInfo.hasBlocksBitVector.set (sentBlock) -// } - numBlocksToSend = numBlocksToSend - 1 // Receive latest SourceInfo from the receiver @@ -1023,7 +1013,6 @@ extends BroadcastRecipe with Logging { curTime = System.currentTimeMillis } } catch { - // TODO: Need to add better exception handling here // If something went wrong, e.g., the worker at the other end died etc. // then close everything up // Exception can happen if the receiver stops receiving @@ -1045,46 +1034,11 @@ extends BroadcastRecipe with Logging { oos.flush } catch { case e: Exception => { - logInfo ("pickAndSendBlock had a " + e) + logInfo ("sendBlock had a " + e) } } logInfo ("Sent block: " + blockToSend + " to " + clientSocket) } - - // Right now picking the rarest first block - private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Int = { - var blockIndex = -1 - var minCopies = Int.MaxValue - var nextIndex = -1 - - // Figure out which blocks the receiver doesn't have - var tempHasBlocksBitVector = - rxHasBlocksBitVector.clone.asInstanceOf[BitSet] - tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size) - hasBlocksBitVector.synchronized { - tempHasBlocksBitVector.and (hasBlocksBitVector) - } - - // Traverse over all the blocks - numCopiesSent.synchronized { - do { - nextIndex = tempHasBlocksBitVector.nextSetBit(nextIndex + 1) - if (nextIndex != -1 && numCopiesSent(nextIndex) < minCopies) { - minCopies = numCopiesSent(nextIndex) - numCopiesSent(nextIndex) = numCopiesSent(nextIndex) + 1 - blockIndex = nextIndex - } - } while (nextIndex != -1) - } - - if (blockIndex < 0) { - logInfo ("No block to send...") - } else { - sendBlock (blockIndex) - } - - return blockIndex - } } } } @@ -1231,12 +1185,8 @@ extends Logging { private var trackMV: TrackMultipleValues = null - // newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed - private val ALPHA = 0.7 - // 125.0 MBps = 1 Gbps link - private val MaxMBps_ = 125.0 - // A peer syncs back to Guide after waiting randomly within following limits + // Also used thoughout the code for small and large waits/timeouts private var MinKnockInterval_ = 500 private var MaxKnockInterval_ = 999 @@ -1316,8 +1266,6 @@ extends Logging { def isMaster = isMaster_ - def MaxMBps = MaxMBps_ - def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ |