diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-13 20:18:48 -0800 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2010-11-13 20:18:48 -0800 |
commit | c3f354c6fc99424c0b405e65266ee0a8c82dd07d (patch) | |
tree | ac98f596d38ce5ad3c502d4717226ff6bfabcbf6 | |
parent | 9915fd22c8facfa1b67b61e70806237bbd7d4c34 (diff) | |
download | spark-c3f354c6fc99424c0b405e65266ee0a8c82dd07d.tar.gz spark-c3f354c6fc99424c0b405e65266ee0a8c82dd07d.tar.bz2 spark-c3f354c6fc99424c0b405e65266ee0a8c82dd07d.zip |
Guide now select random peers to select sources instead of rolling statically.
Random is good. Deterministic is bad. Well, mostly.
-rw-r--r-- | src/scala/spark/Broadcast.scala | 42 |
1 files changed, 30 insertions, 12 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 2ffc37b4bb..1b3c4e9c48 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -832,9 +832,6 @@ extends BroadcastRecipe with Logging { private var sourceInfo: SourceInfo = null private var selectedSources: ListBuffer[SourceInfo] = null - // Used to select a rolling window of peers from listOfSources - private var rollOverIndex = 0 - override def run: Unit = { try { logInfo ("new GuideSingleRequest is running") @@ -878,18 +875,39 @@ extends BroadcastRecipe with Logging { return selectedSources } - var curIndex = rollOverIndex - listOfSources.synchronized { - do { - if (listOfSources(curIndex) != skipSourceInfo) { - selectedSources = selectedSources + listOfSources(curIndex) + if (listOfSources.size <= BroadcastBT.MaxPeersInGuideResponse) { + selectedSources = listOfSources.clone + } else { + var picksLeft = BroadcastBT.MaxPeersInGuideResponse + var alreadyPicked = new BitSet (listOfSources.size) + + while (picksLeft > 0) { + var i = -1 + + do { + i = BroadcastBT.ranGen.nextInt (listOfSources.size) + } while (alreadyPicked.get(i)) + + var peerIter = listOfSources.iterator + var curPeer = peerIter.next + + while (i > 0) { + curPeer = peerIter.next + i = i - 1 + } + + selectedSources = selectedSources + curPeer + alreadyPicked.set (i) + + picksLeft = picksLeft - 1 } - curIndex = (curIndex + 1) % listOfSources.size - } while (curIndex != rollOverIndex && - selectedSources.size != BroadcastBT.MaxPeersInGuideResponse) + } } - rollOverIndex = curIndex + + // Remove the receiving source (if present) + selectedSources = selectedSources - skipSourceInfo + return selectedSources } } |