aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
4 files changed, 115 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f92189b707..4cb0bd4142 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
private[spark] object MapOutputTracker {
- private val LOG_BASE = 1.1
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
- (status.location, decompressSize(status.compressedSizes(reduceId)))
+ (status.location, status.getSizeForBlock(reduceId))
}
}
}
-
- /**
- * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
- * We do this by encoding the log base 1.1 of the size as an integer, which can support
- * sizes up to 35 GB with at most 10% error.
- */
- def compressSize(size: Long): Byte = {
- if (size == 0) {
- 0
- } else if (size <= 1L) {
- 1
- } else {
- math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
- }
- }
-
- /**
- * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
- */
- def decompressSize(compressedSize: Byte): Long = {
- if (compressedSize == 0) {
- 0
- } else {
- math.pow(LOG_BASE, compressedSize & 0xFF).toLong
- }
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d3f63ff92a..e25096ea92 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
*/
-private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
- extends Externalizable {
+private[spark] sealed trait MapStatus {
+ /** Location where this task was run. */
+ def location: BlockManagerId
- def this() = this(null, null) // For deserialization only
+ /** Estimated size for the reduce block, in bytes. */
+ def getSizeForBlock(reduceId: Int): Long
+}
+
+
+private[spark] object MapStatus {
+
+ def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
+ if (uncompressedSizes.length > 2000) {
+ new HighlyCompressedMapStatus(loc, uncompressedSizes)
+ } else {
+ new CompressedMapStatus(loc, uncompressedSizes)
+ }
+ }
+
+ private[this] val LOG_BASE = 1.1
+
+ /**
+ * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
+ * We do this by encoding the log base 1.1 of the size as an integer, which can support
+ * sizes up to 35 GB with at most 10% error.
+ */
+ def compressSize(size: Long): Byte = {
+ if (size == 0) {
+ 0
+ } else if (size <= 1L) {
+ 1
+ } else {
+ math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
+ }
+ }
+
+ /**
+ * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
+ */
+ def decompressSize(compressedSize: Byte): Long = {
+ if (compressedSize == 0) {
+ 0
+ } else {
+ math.pow(LOG_BASE, compressedSize & 0xFF).toLong
+ }
+ }
+}
+
+
+/**
+ * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is
+ * represented using a single byte.
+ *
+ * @param loc location where the task is being executed.
+ * @param compressedSizes size of the blocks, indexed by reduce partition id.
+ */
+private[spark] class CompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var compressedSizes: Array[Byte])
+ extends MapStatus with Externalizable {
+
+ protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.map(MapStatus.compressSize))
+ }
- def writeExternal(out: ObjectOutput) {
- location.writeExternal(out)
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = {
+ MapStatus.decompressSize(compressedSizes(reduceId))
+ }
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
- def readExternal(in: ObjectInput) {
- location = BlockManagerId(in)
- compressedSizes = new Array[Byte](in.readInt())
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ val len = in.readInt()
+ compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
}
}
+
+
+/**
+ * A [[MapStatus]] implementation that only stores the average size of the blocks.
+ *
+ * @param loc location where the task is being executed.
+ * @param avgSize average size of all the blocks
+ */
+private[spark] class HighlyCompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var avgSize: Long)
+ extends MapStatus with Externalizable {
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.sum / uncompressedSizes.length)
+ }
+
+ protected def this() = this(null, 0L) // For deserialization only
+
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = avgSize
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
+ out.writeLong(avgSize)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ avgSize = in.readLong()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 4b9454d75a..746ed33b54 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V](
private def commitWritesAndBuildStatus(): MapStatus = {
// Commit the writes. Get the size of each bucket block (total block size).
- val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
+ val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commitAndClose()
- val size = writer.fileSegment().length
- MapOutputTracker.compressSize(size)
+ writer.fileSegment().length
}
-
- new MapStatus(blockManager.blockManagerId, compressedSizes)
+ MapStatus(blockManager.blockManagerId, sizes)
}
private def revertWrites(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 89a78d6982..927481b72c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
- mapStatus = new MapStatus(blockManager.blockManagerId,
- partitionLengths.map(MapOutputTracker.compressSize))
+ mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
}
/** Close this writer, passing along whether the map completed */