diff options
author | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2011-01-13 19:41:04 -0800 |
---|---|---|
committer | Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> | 2011-01-13 19:41:04 -0800 |
commit | 025b5485b7087725bf10e6b47064438a25ce5ddb (patch) | |
tree | 0a985b8fa8a4b32a2d35228438072c72099a10e3 | |
parent | 5bf6369220570d739ab31301fc9f8e22244a5da5 (diff) | |
download | spark-025b5485b7087725bf10e6b47064438a25ce5ddb.tar.gz spark-025b5485b7087725bf10e6b47064438a25ce5ddb.tar.bz2 spark-025b5485b7087725bf10e6b47064438a25ce5ddb.zip |
Changed speed estimation and time remaining variables to Double instead of Int.
-rw-r--r-- | src/scala/spark/ShuffleTrackerStrategy.scala | 17 |
1 files changed, 8 insertions, 9 deletions
diff --git a/src/scala/spark/ShuffleTrackerStrategy.scala b/src/scala/spark/ShuffleTrackerStrategy.scala index cfc5fcc1b0..6eabfd7dc7 100644 --- a/src/scala/spark/ShuffleTrackerStrategy.scala +++ b/src/scala/spark/ShuffleTrackerStrategy.scala @@ -151,7 +151,7 @@ extends ShuffleTrackerStrategy with Logging { private var hasBlocksPerInputSplit: Array[Array[Int]] = null // Stored in bytes per millisecond - private var speedPerInputSplit: Array[Array[Int]] = null + private var speedPerInputSplit: Array[Array[Double]] = null private var curConnectionsPerLoc: Array[Int] = null private var totalConnectionsPerLoc: Array[Int] = null @@ -172,7 +172,7 @@ extends ShuffleTrackerStrategy with Logging { hasBlocksPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => 0) // Initialize to -1 - speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1) + speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1.0) curConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0) totalConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0) @@ -183,22 +183,21 @@ extends ShuffleTrackerStrategy with Logging { // Estimate time remaining to finish receiving for all reducer/mapper pairs // If speed is unknown or zero then make it 1 to give a large estimate - var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0) + var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0.0) for (i <- 0 until numReducers; j <- 0 until numMappers) { var blocksRemaining = totalBlocksPerInputSplit(i)(j) - hasBlocksPerInputSplit(i)(j) assert(blocksRemaining >= 0) individualEstimates(i)(j) = - { if (blocksRemaining < 0) 0 else blocksRemaining } * - Shuffle.BlockSize / - { if (speedPerInputSplit(i)(j) <= 0) 1 else speedPerInputSplit(i)(j) } + 1.0 * blocksRemaining * Shuffle.BlockSize / + { if (speedPerInputSplit(i)(j) <= 0.0) 1.0 else speedPerInputSplit(i)(j) } } - // Check if all individualEstimates entries have non-zero values + // Check if all speedPerInputSplit entries have non-zero values var estimationComplete = true for (i <- 0 until numReducers; j <- 0 until numMappers) { - if (speedPerInputSplit(i)(j) < 0) { + if (speedPerInputSplit(i)(j) < 0.0) { estimationComplete = false } } @@ -285,7 +284,7 @@ extends ShuffleTrackerStrategy with Logging { // TODO: We are forgetting the old speed. Can use averaging at some point. if (receptionStat.bytesReceived > 0) { speedPerInputSplit(reducerSplitInfo.splitId)(receptionStat.serverSplitIndex) = - receptionStat.bytesReceived / (receptionStat.timeSpent + 1) + 1.0 * receptionStat.bytesReceived / (receptionStat.timeSpent + 1.0) } // Update current connections to the mapper |