aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala76
1 files changed, 64 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index e25096ea92..2ab5d9637b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import org.roaringbitmap.RoaringBitmap
+
import org.apache.spark.storage.BlockManagerId
/**
@@ -29,7 +31,12 @@ private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: BlockManagerId
- /** Estimated size for the reduce block, in bytes. */
+ /**
+ * Estimated size for the reduce block, in bytes.
+ *
+ * If a block is non-empty, then this method MUST return a non-zero size. This invariant is
+ * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
+ */
def getSizeForBlock(reduceId: Int): Long
}
@@ -38,7 +45,7 @@ private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
- new HighlyCompressedMapStatus(loc, uncompressedSizes)
+ HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
@@ -112,35 +119,80 @@ private[spark] class CompressedMapStatus(
}
}
-
/**
- * A [[MapStatus]] implementation that only stores the average size of the blocks.
+ * A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
+ * plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap
+ * is compressed.
*
- * @param loc location where the task is being executed.
- * @param avgSize average size of all the blocks
+ * @param loc location where the task is being executed
+ * @param numNonEmptyBlocks the number of non-empty blocks
+ * @param emptyBlocks a bitmap tracking which blocks are empty
+ * @param avgSize average size of the non-empty blocks
*/
-private[spark] class HighlyCompressedMapStatus(
+private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
+ private[this] var numNonEmptyBlocks: Int,
+ private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {
- def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
- this(loc, uncompressedSizes.sum / uncompressedSizes.length)
- }
+ // loc could be null when the default constructor is called during deserialization
+ require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
+ "Average size can only be zero for map stages that produced no output")
- protected def this() = this(null, 0L) // For deserialization only
+ protected def this() = this(null, -1, null, -1) // For deserialization only
override def location: BlockManagerId = loc
- override def getSizeForBlock(reduceId: Int): Long = avgSize
+ override def getSizeForBlock(reduceId: Int): Long = {
+ if (emptyBlocks.contains(reduceId)) {
+ 0
+ } else {
+ avgSize
+ }
+ }
override def writeExternal(out: ObjectOutput): Unit = {
loc.writeExternal(out)
+ emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}
override def readExternal(in: ObjectInput): Unit = {
loc = BlockManagerId(in)
+ emptyBlocks = new RoaringBitmap()
+ emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
}
+
+private[spark] object HighlyCompressedMapStatus {
+ def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
+ // We must keep track of which blocks are empty so that we don't report a zero-sized
+ // block as being non-empty (or vice-versa) when using the average block size.
+ var i = 0
+ var numNonEmptyBlocks: Int = 0
+ var totalSize: Long = 0
+ // From a compression standpoint, it shouldn't matter whether we track empty or non-empty
+ // blocks. From a performance standpoint, we benefit from tracking empty blocks because
+ // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
+ val emptyBlocks = new RoaringBitmap()
+ val totalNumBlocks = uncompressedSizes.length
+ while (i < totalNumBlocks) {
+ var size = uncompressedSizes(i)
+ if (size > 0) {
+ numNonEmptyBlocks += 1
+ totalSize += size
+ } else {
+ emptyBlocks.add(i)
+ }
+ i += 1
+ }
+ val avgSize = if (numNonEmptyBlocks > 0) {
+ totalSize / numNonEmptyBlocks
+ } else {
+ 0
+ }
+ new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
+ }
+}