aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala53
4 files changed, 104 insertions, 34 deletions
diff --git a/core/pom.xml b/core/pom.xml
index a5a178079b..7b68dbaea4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -133,6 +133,10 @@
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index e25096ea92..2ab5d9637b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import org.roaringbitmap.RoaringBitmap
+
import org.apache.spark.storage.BlockManagerId
/**
@@ -29,7 +31,12 @@ private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: BlockManagerId
- /** Estimated size for the reduce block, in bytes. */
+ /**
+ * Estimated size for the reduce block, in bytes.
+ *
+ * If a block is non-empty, then this method MUST return a non-zero size. This invariant is
+ * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
+ */
def getSizeForBlock(reduceId: Int): Long
}
@@ -38,7 +45,7 @@ private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
- new HighlyCompressedMapStatus(loc, uncompressedSizes)
+ HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
@@ -112,35 +119,80 @@ private[spark] class CompressedMapStatus(
}
}
-
/**
- * A [[MapStatus]] implementation that only stores the average size of the blocks.
+ * A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
+ * plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap
+ * is compressed.
*
- * @param loc location where the task is being executed.
- * @param avgSize average size of all the blocks
+ * @param loc location where the task is being executed
+ * @param numNonEmptyBlocks the number of non-empty blocks
+ * @param emptyBlocks a bitmap tracking which blocks are empty
+ * @param avgSize average size of the non-empty blocks
*/
-private[spark] class HighlyCompressedMapStatus(
+private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
+ private[this] var numNonEmptyBlocks: Int,
+ private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {
- def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
- this(loc, uncompressedSizes.sum / uncompressedSizes.length)
- }
+ // loc could be null when the default constructor is called during deserialization
+ require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
+ "Average size can only be zero for map stages that produced no output")
- protected def this() = this(null, 0L) // For deserialization only
+ protected def this() = this(null, -1, null, -1) // For deserialization only
override def location: BlockManagerId = loc
- override def getSizeForBlock(reduceId: Int): Long = avgSize
+ override def getSizeForBlock(reduceId: Int): Long = {
+ if (emptyBlocks.contains(reduceId)) {
+ 0
+ } else {
+ avgSize
+ }
+ }
override def writeExternal(out: ObjectOutput): Unit = {
loc.writeExternal(out)
+ emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}
override def readExternal(in: ObjectInput): Unit = {
loc = BlockManagerId(in)
+ emptyBlocks = new RoaringBitmap()
+ emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
}
+
+private[spark] object HighlyCompressedMapStatus {
+ def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
+ // We must keep track of which blocks are empty so that we don't report a zero-sized
+ // block as being non-empty (or vice-versa) when using the average block size.
+ var i = 0
+ var numNonEmptyBlocks: Int = 0
+ var totalSize: Long = 0
+ // From a compression standpoint, it shouldn't matter whether we track empty or non-empty
+ // blocks. From a performance standpoint, we benefit from tracking empty blocks because
+ // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
+ val emptyBlocks = new RoaringBitmap()
+ val totalNumBlocks = uncompressedSizes.length
+ while (i < totalNumBlocks) {
+ var size = uncompressedSizes(i)
+ if (size > 0) {
+ numNonEmptyBlocks += 1
+ totalSize += size
+ } else {
+ emptyBlocks.add(i)
+ }
+ i += 1
+ }
+ val avgSize = if (numNonEmptyBlocks > 0) {
+ totalSize / numNonEmptyBlocks
+ } else {
+ 0
+ }
+ new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 465c1a8a43..6d2e696dc2 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -459,6 +459,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
}
+ test("collect large number of empty partitions") {
+ // Regression test for SPARK-4019
+ assert(sc.makeRDD(0 until 10, 1000).repartition(2001).collect().toSet === (0 until 10).toSet)
+ }
+
test("take") {
var nums = sc.makeRDD(Range(1, 1000), 1)
assert(nums.take(0).size === 0)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 79e04f046e..950c6dc58e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkConf
import org.apache.spark.serializer.JavaSerializer
+import scala.util.Random
class MapStatusSuite extends FunSuite {
@@ -46,6 +47,26 @@ class MapStatusSuite extends FunSuite {
}
}
+ test("MapStatus should never report non-empty blocks' sizes as 0") {
+ import Math._
+ for (
+ numSizes <- Seq(1, 10, 100, 1000, 10000);
+ mean <- Seq(0L, 100L, 10000L, Int.MaxValue.toLong);
+ stddev <- Seq(0.0, 0.01, 0.5, 1.0)
+ ) {
+ val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean)
+ val status = MapStatus(BlockManagerId("a", "b", 10), sizes)
+ val status1 = compressAndDecompressMapStatus(status)
+ for (i <- 0 until numSizes) {
+ if (sizes(i) != 0) {
+ val failureMessage = s"Failed with $numSizes sizes with mean=$mean, stddev=$stddev"
+ assert(status.getSizeForBlock(i) !== 0, failureMessage)
+ assert(status1.getSizeForBlock(i) !== 0, failureMessage)
+ }
+ }
+ }
+ }
+
test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
val sizes = Array.fill[Long](2001)(150L)
val status = MapStatus(null, sizes)
@@ -56,37 +77,25 @@ class MapStatusSuite extends FunSuite {
assert(status.getSizeForBlock(2000) === 150L)
}
- test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") {
- val sizes = Array.tabulate[Long](50) { i => i.toLong }
+ test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") {
+ val sizes = Array.tabulate[Long](3000) { i => i.toLong }
+ val avg = sizes.sum / sizes.filter(_ != 0).length
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
- val ser = new JavaSerializer(new SparkConf)
- val buf = ser.newInstance().serialize(status)
- val status1 = ser.newInstance().deserialize[MapStatus](buf)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
assert(status1.location == loc)
- for (i <- 0 until sizes.length) {
- // make sure the estimated size is within 10% of the input; note that we skip the very small
- // sizes because the compression is very lossy there.
+ for (i <- 0 until 3000) {
val estimate = status1.getSizeForBlock(i)
- if (estimate > 100) {
- assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i),
- s"incorrect estimated size $estimate, original was ${sizes(i)}")
+ if (sizes(i) > 0) {
+ assert(estimate === avg)
}
}
}
- test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") {
- val sizes = Array.tabulate[Long](3000) { i => i.toLong }
- val avg = sizes.sum / sizes.length
- val loc = BlockManagerId("a", "b", 10)
- val status = MapStatus(loc, sizes)
+ def compressAndDecompressMapStatus(status: MapStatus): MapStatus = {
val ser = new JavaSerializer(new SparkConf)
val buf = ser.newInstance().serialize(status)
- val status1 = ser.newInstance().deserialize[MapStatus](buf)
- assert(status1.location == loc)
- for (i <- 0 until 3000) {
- val estimate = status1.getSizeForBlock(i)
- assert(estimate === avg)
- }
+ ser.newInstance().deserialize[MapStatus](buf)
}
}