aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2011-01-13 19:41:04 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>2011-01-13 19:41:04 -0800
commit025b5485b7087725bf10e6b47064438a25ce5ddb (patch)
tree0a985b8fa8a4b32a2d35228438072c72099a10e3
parent5bf6369220570d739ab31301fc9f8e22244a5da5 (diff)
downloadspark-025b5485b7087725bf10e6b47064438a25ce5ddb.tar.gz
spark-025b5485b7087725bf10e6b47064438a25ce5ddb.tar.bz2
spark-025b5485b7087725bf10e6b47064438a25ce5ddb.zip
Changed speed estimation and time remaining variables to Double instead of Int.
-rw-r--r--src/scala/spark/ShuffleTrackerStrategy.scala17
1 files changed, 8 insertions, 9 deletions
diff --git a/src/scala/spark/ShuffleTrackerStrategy.scala b/src/scala/spark/ShuffleTrackerStrategy.scala
index cfc5fcc1b0..6eabfd7dc7 100644
--- a/src/scala/spark/ShuffleTrackerStrategy.scala
+++ b/src/scala/spark/ShuffleTrackerStrategy.scala
@@ -151,7 +151,7 @@ extends ShuffleTrackerStrategy with Logging {
private var hasBlocksPerInputSplit: Array[Array[Int]] = null
// Stored in bytes per millisecond
- private var speedPerInputSplit: Array[Array[Int]] = null
+ private var speedPerInputSplit: Array[Array[Double]] = null
private var curConnectionsPerLoc: Array[Int] = null
private var totalConnectionsPerLoc: Array[Int] = null
@@ -172,7 +172,7 @@ extends ShuffleTrackerStrategy with Logging {
hasBlocksPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => 0)
// Initialize to -1
- speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1)
+ speedPerInputSplit = Array.tabulate(numReducers, numMappers)((_,_) => -1.0)
curConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0)
totalConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0)
@@ -183,22 +183,21 @@ extends ShuffleTrackerStrategy with Logging {
// Estimate time remaining to finish receiving for all reducer/mapper pairs
// If speed is unknown or zero then make it 1 to give a large estimate
- var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0)
+ var individualEstimates = Array.tabulate(numReducers, numMappers)((_,_) => 0.0)
for (i <- 0 until numReducers; j <- 0 until numMappers) {
var blocksRemaining = totalBlocksPerInputSplit(i)(j) -
hasBlocksPerInputSplit(i)(j)
assert(blocksRemaining >= 0)
individualEstimates(i)(j) =
- { if (blocksRemaining < 0) 0 else blocksRemaining } *
- Shuffle.BlockSize /
- { if (speedPerInputSplit(i)(j) <= 0) 1 else speedPerInputSplit(i)(j) }
+ 1.0 * blocksRemaining * Shuffle.BlockSize /
+ { if (speedPerInputSplit(i)(j) <= 0.0) 1.0 else speedPerInputSplit(i)(j) }
}
- // Check if all individualEstimates entries have non-zero values
+ // Check if all speedPerInputSplit entries have non-zero values
var estimationComplete = true
for (i <- 0 until numReducers; j <- 0 until numMappers) {
- if (speedPerInputSplit(i)(j) < 0) {
+ if (speedPerInputSplit(i)(j) < 0.0) {
estimationComplete = false
}
}
@@ -285,7 +284,7 @@ extends ShuffleTrackerStrategy with Logging {
// TODO: We are forgetting the old speed. Can use averaging at some point.
if (receptionStat.bytesReceived > 0) {
speedPerInputSplit(reducerSplitInfo.splitId)(receptionStat.serverSplitIndex) =
- receptionStat.bytesReceived / (receptionStat.timeSpent + 1)
+ 1.0 * receptionStat.bytesReceived / (receptionStat.timeSpent + 1.0)
}
// Update current connections to the mapper