aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-12 15:27:47 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2010-11-12 15:27:47 -0800
commit9915fd22c8facfa1b67b61e70806237bbd7d4c34 (patch)
tree0d2b02570bc257475c1acabffacb57e4c95432f7
parentc6874d43709a0d34beb2323318abdd0762108d48 (diff)
downloadspark-9915fd22c8facfa1b67b61e70806237bbd7d4c34.tar.gz
spark-9915fd22c8facfa1b67b61e70806237bbd7d4c34.tar.bz2
spark-9915fd22c8facfa1b67b61e70806237bbd7d4c34.zip
Added BroadcastBT.EndGameFraction config option to control when to start end game.
-rw-r--r--conf/java-opts2
-rw-r--r--src/scala/spark/Broadcast.scala18
2 files changed, 16 insertions, 4 deletions
diff --git a/conf/java-opts b/conf/java-opts
index 60b564d73b..7fb9e50bbc 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=11111 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.MaxChatTime=500
+-Dspark.broadcast.MasterHostAddress=127.0.0.1 -Dspark.broadcast.MasterTrackerPort=11111 -Dspark.broadcast.BlockSize=256 -Dspark.broadcast.MaxRetryCount=2 -Dspark.broadcast.TrackerSocketTimeout=50000 -Dspark.broadcast.ServerSocketTimeout=10000 -Dspark.broadcast.MaxChatTime=500 -Dspark.broadcast.EndGameFraction=0.95
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index f6b3c01d60..2ffc37b4bb 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -664,9 +664,12 @@ extends BroadcastRecipe with Logging {
needBlocksBitVector = hasBlocksBitVector.clone.asInstanceOf[BitSet]
}
- blocksInRequestBitVector.synchronized {
- // Include blocks already in transmission
- needBlocksBitVector.or (blocksInRequestBitVector)
+ // Include blocks already in transmission ONLY IF
+ // BroadcastBT.EndGameFraction has NOT been achieved
+ if ((1.0 * hasBlocks / totalBlocks) < BroadcastBT.EndGameFraction) {
+ blocksInRequestBitVector.synchronized {
+ needBlocksBitVector.or (blocksInRequestBitVector)
+ }
}
// Find blocks that are neither here nor in transit
@@ -1227,6 +1230,10 @@ extends Logging {
// Peers can char at most this milliseconds or transfer this number of blocks
private var MaxChatTime_ = 250
private var MaxChatBlocks_ = 1024
+
+ // Fraction of blocks to receive before entering the end game
+ private var EndGameFraction_ = 1.0
+
def initialize (isMaster__ : Boolean): Unit = {
synchronized {
@@ -1263,6 +1270,9 @@ extends Logging {
MaxChatBlocks_ =
System.getProperty ("spark.broadcast.MaxChatBlocks", "1024").toInt
+ EndGameFraction_ =
+ System.getProperty ("spark.broadcast.EndGameFraction", "1.0").toDouble
+
isMaster_ = isMaster__
if (isMaster) {
@@ -1300,6 +1310,8 @@ extends Logging {
def MaxChatTime = MaxChatTime_
def MaxChatBlocks = MaxChatBlocks_
+ def EndGameFraction = EndGameFraction_
+
def registerValue (uuid: UUID, gInfo: SourceInfo): Unit = {
valueToGuideMap.synchronized {
valueToGuideMap += (uuid -> gInfo)