aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-13 20:18:48 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-13 20:18:48 -0800
commitc3f354c6fc99424c0b405e65266ee0a8c82dd07d (patch)
treeac98f596d38ce5ad3c502d4717226ff6bfabcbf6
parent9915fd22c8facfa1b67b61e70806237bbd7d4c34 (diff)
downloadspark-c3f354c6fc99424c0b405e65266ee0a8c82dd07d.tar.gz
spark-c3f354c6fc99424c0b405e65266ee0a8c82dd07d.tar.bz2
spark-c3f354c6fc99424c0b405e65266ee0a8c82dd07d.zip
Guide now select random peers to select sources instead of rolling statically.
Random is good. Deterministic is bad. Well, mostly.
-rw-r--r--src/scala/spark/Broadcast.scala42
1 files changed, 30 insertions, 12 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index 2ffc37b4bb..1b3c4e9c48 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -832,9 +832,6 @@ extends BroadcastRecipe with Logging {
private var sourceInfo: SourceInfo = null
private var selectedSources: ListBuffer[SourceInfo] = null
- // Used to select a rolling window of peers from listOfSources
- private var rollOverIndex = 0
-
override def run: Unit = {
try {
logInfo ("new GuideSingleRequest is running")
@@ -878,18 +875,39 @@ extends BroadcastRecipe with Logging {
return selectedSources
}
- var curIndex = rollOverIndex
-
listOfSources.synchronized {
- do {
- if (listOfSources(curIndex) != skipSourceInfo) {
- selectedSources = selectedSources + listOfSources(curIndex)
+ if (listOfSources.size <= BroadcastBT.MaxPeersInGuideResponse) {
+ selectedSources = listOfSources.clone
+ } else {
+ var picksLeft = BroadcastBT.MaxPeersInGuideResponse
+ var alreadyPicked = new BitSet (listOfSources.size)
+
+ while (picksLeft > 0) {
+ var i = -1
+
+ do {
+ i = BroadcastBT.ranGen.nextInt (listOfSources.size)
+ } while (alreadyPicked.get(i))
+
+ var peerIter = listOfSources.iterator
+ var curPeer = peerIter.next
+
+ while (i > 0) {
+ curPeer = peerIter.next
+ i = i - 1
+ }
+
+ selectedSources = selectedSources + curPeer
+ alreadyPicked.set (i)
+
+ picksLeft = picksLeft - 1
}
- curIndex = (curIndex + 1) % listOfSources.size
- } while (curIndex != rollOverIndex &&
- selectedSources.size != BroadcastBT.MaxPeersInGuideResponse)
+ }
}
- rollOverIndex = curIndex
+
+ // Remove the receiving source (if present)
+ selectedSources = selectedSources - skipSourceInfo
+
return selectedSources
}
}