aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-29 11:20:04 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-29 11:20:04 -0800
commit8d775448c221e490dc369a1c23eeda629140c419 (patch)
tree4fb6a919d4dd12e14a781708d4be254f13f492f8
parent830496b901d0abd99f6413015e8ad1d70efb724e (diff)
downloadspark-8d775448c221e490dc369a1c23eeda629140c419.tar.gz
spark-8d775448c221e490dc369a1c23eeda629140c419.tar.bz2
spark-8d775448c221e490dc369a1c23eeda629140c419.zip
- Resolved some of the simpler TODOs related to different timeout and wait periods.
- Removed unused code.
-rw-r--r--src/scala/spark/Broadcast.scala80
1 files changed, 14 insertions, 66 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 432fc5bb4c..9e6355782f 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -435,7 +435,7 @@ extends BroadcastRecipe with Logging {
// TODO: Must fix this. This might never break if broadcast fails.
// We should be able to break and send false. Also need to kill threads
while (hasBlocks < totalBlocks) {
- Thread.sleep(1000)
+ Thread.sleep(BroadcastBT.MaxKnockInterval)
}
return true
@@ -473,14 +473,13 @@ extends BroadcastRecipe with Logging {
}
// Sleep for a while before starting some more threads
- // TODO: Whats up with this?
- Thread.sleep (500)
+ Thread.sleep (BroadcastBT.MinKnockInterval)
}
// Shutdown the thread pool
threadPool.shutdown
}
- // TODO: Right now picking the one that has the most blocks this peer wants
+ // Right now picking the one that has the most blocks this peer wants
// Also picking peer randomly if no one has anything interesting
private def pickPeerToTalkTo: SourceInfo = {
var curPeer: SourceInfo = null
@@ -509,7 +508,7 @@ extends BroadcastRecipe with Logging {
}
// Always pick randomly or randomly pick randomly?
- // TODO: Now its always
+ // Now always picking randomly
if (curPeer == null && peersNotInUse.size > 0) {
// Pick uniformly the i'th required peer
var i = BroadcastBT.ranGen.nextInt (peersNotInUse.size)
@@ -548,9 +547,8 @@ extends BroadcastRecipe with Logging {
}
}
- // TODO: Fix a value for timeout timer
var timeOutTimer = new Timer
- timeOutTimer.schedule (timeOutTask, 1 * 1000)
+ timeOutTimer.schedule (timeOutTask, BroadcastBT.MaxKnockInterval)
logInfo ("TalkToPeer started... => " + peerToTalkTo)
@@ -638,13 +636,13 @@ extends BroadcastRecipe with Logging {
case eofe: java.io.EOFException => { }
case e: Exception => {
logInfo ("TalktoPeer had a " + e)
- // Remove this pInfo from listOfSources
- // TODO: We probably should have the following in some form, but not
+ // TODO: Remove 'newPeerToTalkTo' from listOfSources
+ // We probably should have the following in some form, but not
// really here. This exception can happen if the sender just breaks connection
-// listOfSources.synchronized {
-// logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo)
-// listOfSources = listOfSources - peerToTalkTo
-// }
+ // listOfSources.synchronized {
+ // logInfo ("Exception in TalkToPeer. Removing source: " + peerToTalkTo)
+ // listOfSources = listOfSources - peerToTalkTo
+ // }
}
} finally {
// blockToAskFor != -1 => there was an exception
@@ -918,8 +916,7 @@ extends BroadcastRecipe with Logging {
class ServeMultipleRequests
extends Thread with Logging {
override def run: Unit = {
- // TODO: Not sure if this will be able to fix the number of outgoing links
- // We should have a timeout mechanism on the receiver side
+ // Server at most BroadcastBT.MaxRxPeers peers
var threadPool =
BroadcastBT.newDaemonFixedThreadPool(BroadcastBT.MaxRxPeers)
@@ -1006,13 +1003,6 @@ extends BroadcastRecipe with Logging {
sendBlock (blockToSend)
rxSourceInfo.hasBlocksBitVector.set (blockToSend)
-// val sentBlock = pickAndSendBlock (rxSourceInfo.hasBlocksBitVector)
-// if (sentBlock < 0) {
-// keepSending = false
-// } else {
-// rxSourceInfo.hasBlocksBitVector.set (sentBlock)
-// }
-
numBlocksToSend = numBlocksToSend - 1
// Receive latest SourceInfo from the receiver
@@ -1023,7 +1013,6 @@ extends BroadcastRecipe with Logging {
curTime = System.currentTimeMillis
}
} catch {
- // TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
// Exception can happen if the receiver stops receiving
@@ -1045,46 +1034,11 @@ extends BroadcastRecipe with Logging {
oos.flush
} catch {
case e: Exception => {
- logInfo ("pickAndSendBlock had a " + e)
+ logInfo ("sendBlock had a " + e)
}
}
logInfo ("Sent block: " + blockToSend + " to " + clientSocket)
}
-
- // Right now picking the rarest first block
- private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Int = {
- var blockIndex = -1
- var minCopies = Int.MaxValue
- var nextIndex = -1
-
- // Figure out which blocks the receiver doesn't have
- var tempHasBlocksBitVector =
- rxHasBlocksBitVector.clone.asInstanceOf[BitSet]
- tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size)
- hasBlocksBitVector.synchronized {
- tempHasBlocksBitVector.and (hasBlocksBitVector)
- }
-
- // Traverse over all the blocks
- numCopiesSent.synchronized {
- do {
- nextIndex = tempHasBlocksBitVector.nextSetBit(nextIndex + 1)
- if (nextIndex != -1 && numCopiesSent(nextIndex) < minCopies) {
- minCopies = numCopiesSent(nextIndex)
- numCopiesSent(nextIndex) = numCopiesSent(nextIndex) + 1
- blockIndex = nextIndex
- }
- } while (nextIndex != -1)
- }
-
- if (blockIndex < 0) {
- logInfo ("No block to send...")
- } else {
- sendBlock (blockIndex)
- }
-
- return blockIndex
- }
}
}
}
@@ -1231,12 +1185,8 @@ extends Logging {
private var trackMV: TrackMultipleValues = null
- // newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed
- private val ALPHA = 0.7
- // 125.0 MBps = 1 Gbps link
- private val MaxMBps_ = 125.0
-
// A peer syncs back to Guide after waiting randomly within following limits
+ // Also used thoughout the code for small and large waits/timeouts
private var MinKnockInterval_ = 500
private var MaxKnockInterval_ = 999
@@ -1316,8 +1266,6 @@ extends Logging {
def isMaster = isMaster_
- def MaxMBps = MaxMBps_
-
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_