aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-11-27 19:25:45 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-11-27 19:25:45 -0800
commit60cb3e9380b81d0b9d72c6668be64757642a5f8a (patch)
tree0fadf10bf19db2281582dfe348fb4aa2d69db078
parentcd16eab0dbad9d6186f064e5d95259562eb51628 (diff)
parentf24bfd2dd1f5c271b05ac9f166b9d1b6d938a440 (diff)
downloadspark-60cb3e9380b81d0b9d72c6668be64757642a5f8a.tar.gz
spark-60cb3e9380b81d0b9d72c6668be64757642a5f8a.tar.bz2
spark-60cb3e9380b81d0b9d72c6668be64757642a5f8a.zip
Merge pull request #312 from rxin/pde_size_compress
For size compression, compress non zero values into non zero values.
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala29
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala4
2 files changed, 18 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 45441aa5e5..fcf725a255 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -2,6 +2,10 @@ package spark
import java.io._
import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import akka.actor._
import akka.dispatch._
@@ -11,16 +15,13 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import scheduler.MapStatus
+import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
- extends MapOutputTrackerMessage
+ extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
@@ -88,14 +89,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
}
-
+
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
var array = mapStatuses.get(shuffleId)
array.synchronized {
array(mapId) = status
}
}
-
+
def registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
@@ -119,10 +120,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
}
}
-
+
// Remembers which map output locations are currently being fetched on a worker
val fetching = new HashSet[Int]
-
+
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId)
@@ -149,7 +150,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
val host = System.getProperty("spark.hostname", Utils.localHostName)
val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
val fetchedStatuses = deserializeStatuses(fetchedBytes)
-
+
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
fetching.synchronized {
@@ -254,8 +255,10 @@ private[spark] object MapOutputTracker {
* sizes up to 35 GB with at most 10% error.
*/
def compressSize(size: Long): Byte = {
- if (size <= 1L) {
+ if (size == 0) {
0
+ } else if (size <= 1L) {
+ 1
} else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
}
@@ -266,7 +269,7 @@ private[spark] object MapOutputTracker {
*/
def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) {
- 1
+ 0
} else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 4e9717d871..dee45b6e8f 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -5,7 +5,7 @@ import org.scalatest.FunSuite
class MapOutputTrackerSuite extends FunSuite {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 0)
+ assert(MapOutputTracker.compressSize(1L) === 1)
assert(MapOutputTracker.compressSize(2L) === 8)
assert(MapOutputTracker.compressSize(10L) === 25)
assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
@@ -15,7 +15,7 @@ class MapOutputTrackerSuite extends FunSuite {
}
test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 1)
+ assert(MapOutputTracker.decompressSize(0) === 0)
for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
assert(size2 >= 0.99 * size && size2 <= 1.11 * size,