aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-29 22:56:22 -0700
committerReynold Xin <rxin@apache.org>2014-09-29 22:56:22 -0700
commit6b79bfb42580b6bd4c4cd99fb521534a94150693 (patch)
treed2cf3ee0b180a83062f529530f2de7e13b6c2391
parent210404a56197ad347f1e621ed53ef01327fba2bd (diff)
downloadspark-6b79bfb42580b6bd4c4cd99fb521534a94150693.tar.gz
spark-6b79bfb42580b6bd4c4cd99fb521534a94150693.tar.bz2
spark-6b79bfb42580b6bd4c4cd99fb521534a94150693.zip
[SPARK-3613] Record only average block size in MapStatus for large stages
This changes the way we send MapStatus from executors back to driver for large stages (>2000 tasks). For large stages, we no longer send one byte per block. Instead, we just send the average block size. This makes large jobs (tens of thousands of tasks) much more reliable since the driver no longer sends huge amount of data. Author: Reynold Xin <rxin@apache.org> Closes #2470 from rxin/mapstatus and squashes the following commits: 822ff54 [Reynold Xin] Code review feedback. 3b86f56 [Reynold Xin] Added MimaExclude. f89d182 [Reynold Xin] Fixed a bug in MapStatus 6a0401c [Reynold Xin] [SPARK-3613] Record only average block size in MapStatus for large stages.
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala66
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala92
-rw-r--r--core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala14
-rw-r--r--project/MimaExcludes.scala5
9 files changed, 240 insertions, 98 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f92189b707..4cb0bd4142 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
private[spark] object MapOutputTracker {
- private val LOG_BASE = 1.1
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
- (status.location, decompressSize(status.compressedSizes(reduceId)))
+ (status.location, status.getSizeForBlock(reduceId))
}
}
}
-
- /**
- * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
- * We do this by encoding the log base 1.1 of the size as an integer, which can support
- * sizes up to 35 GB with at most 10% error.
- */
- def compressSize(size: Long): Byte = {
- if (size == 0) {
- 0
- } else if (size <= 1L) {
- 1
- } else {
- math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
- }
- }
-
- /**
- * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
- */
- def decompressSize(compressedSize: Byte): Long = {
- if (compressedSize == 0) {
- 0
- } else {
- math.pow(LOG_BASE, compressedSize & 0xFF).toLong
- }
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d3f63ff92a..e25096ea92 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
*/
-private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
- extends Externalizable {
+private[spark] sealed trait MapStatus {
+ /** Location where this task was run. */
+ def location: BlockManagerId
- def this() = this(null, null) // For deserialization only
+ /** Estimated size for the reduce block, in bytes. */
+ def getSizeForBlock(reduceId: Int): Long
+}
+
+
+private[spark] object MapStatus {
+
+ def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
+ if (uncompressedSizes.length > 2000) {
+ new HighlyCompressedMapStatus(loc, uncompressedSizes)
+ } else {
+ new CompressedMapStatus(loc, uncompressedSizes)
+ }
+ }
+
+ private[this] val LOG_BASE = 1.1
+
+ /**
+ * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
+ * We do this by encoding the log base 1.1 of the size as an integer, which can support
+ * sizes up to 35 GB with at most 10% error.
+ */
+ def compressSize(size: Long): Byte = {
+ if (size == 0) {
+ 0
+ } else if (size <= 1L) {
+ 1
+ } else {
+ math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
+ }
+ }
+
+ /**
+ * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
+ */
+ def decompressSize(compressedSize: Byte): Long = {
+ if (compressedSize == 0) {
+ 0
+ } else {
+ math.pow(LOG_BASE, compressedSize & 0xFF).toLong
+ }
+ }
+}
+
+
+/**
+ * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is
+ * represented using a single byte.
+ *
+ * @param loc location where the task is being executed.
+ * @param compressedSizes size of the blocks, indexed by reduce partition id.
+ */
+private[spark] class CompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var compressedSizes: Array[Byte])
+ extends MapStatus with Externalizable {
+
+ protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.map(MapStatus.compressSize))
+ }
- def writeExternal(out: ObjectOutput) {
- location.writeExternal(out)
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = {
+ MapStatus.decompressSize(compressedSizes(reduceId))
+ }
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
- def readExternal(in: ObjectInput) {
- location = BlockManagerId(in)
- compressedSizes = new Array[Byte](in.readInt())
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ val len = in.readInt()
+ compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
}
}
+
+
+/**
+ * A [[MapStatus]] implementation that only stores the average size of the blocks.
+ *
+ * @param loc location where the task is being executed.
+ * @param avgSize average size of all the blocks
+ */
+private[spark] class HighlyCompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var avgSize: Long)
+ extends MapStatus with Externalizable {
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.sum / uncompressedSizes.length)
+ }
+
+ protected def this() = this(null, 0L) // For deserialization only
+
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = avgSize
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
+ out.writeLong(avgSize)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ avgSize = in.readLong()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 4b9454d75a..746ed33b54 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V](
private def commitWritesAndBuildStatus(): MapStatus = {
// Commit the writes. Get the size of each bucket block (total block size).
- val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
+ val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commitAndClose()
- val size = writer.fileSegment().length
- MapOutputTracker.compressSize(size)
+ writer.fileSegment().length
}
-
- new MapStatus(blockManager.blockManagerId, compressedSizes)
+ MapStatus(blockManager.blockManagerId, sizes)
}
private def revertWrites(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 89a78d6982..927481b72c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
- mapStatus = new MapStatus(blockManager.blockManagerId,
- partitionLengths.map(MapOutputTracker.compressSize))
+ mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
}
/** Close this writer, passing along whether the map completed */
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 5369169811..1fef79ad10 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -23,32 +23,13 @@ import akka.actor._
import akka.testkit.TestActorRef
import org.scalatest.FunSuite
-import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
private val conf = new SparkConf
- test("compressSize") {
- assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 1)
- assert(MapOutputTracker.compressSize(2L) === 8)
- assert(MapOutputTracker.compressSize(10L) === 25)
- assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
- assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218)
- // This last size is bigger than we can encode in a byte, so check that we just return 255
- assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 255)
- }
-
- test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 0)
- for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
- val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
- assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
- "size " + size + " decompressed to " + size2 + ", which is out of range")
- }
- }
test("master start and stop") {
val actorSystem = ActorSystem("test")
@@ -65,14 +46,12 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
assert(tracker.containsShuffle(10))
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
- Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
- Array(compressedSize10000, compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+ Array(1000L, 10000L)))
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
+ Array(10000L, 1000L)))
val statuses = tracker.getServerStatuses(10, 0)
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000)))
@@ -84,11 +63,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
+ val compressedSize1000 = MapStatus.compressSize(1000L)
+ val compressedSize10000 = MapStatus.compressSize(10000L)
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
assert(tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).nonEmpty)
@@ -103,11 +82,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
+ val compressedSize1000 = MapStatus.compressSize(1000L)
+ val compressedSize10000 = MapStatus.compressSize(10000L)
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simultaneous fetch failures
@@ -142,10 +121,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0, MapStatus(
+ BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
@@ -173,8 +151,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// Frame size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0)))
+ masterTracker.registerMapOutput(10, 0, MapStatus(
+ BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))
}
@@ -194,8 +172,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// being sent.
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
- masterTracker.registerMapOutput(20, i, new MapStatus(
- BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0)))
+ masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
+ BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index aa73469b6a..a2e4f712db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -740,7 +740,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
private def makeMapStatus(host: String, reduces: Int): MapStatus =
- new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
+ MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
new file mode 100644
index 0000000000..79e04f046e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.storage.BlockManagerId
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
+
+
+class MapStatusSuite extends FunSuite {
+
+ test("compressSize") {
+ assert(MapStatus.compressSize(0L) === 0)
+ assert(MapStatus.compressSize(1L) === 1)
+ assert(MapStatus.compressSize(2L) === 8)
+ assert(MapStatus.compressSize(10L) === 25)
+ assert((MapStatus.compressSize(1000000L) & 0xFF) === 145)
+ assert((MapStatus.compressSize(1000000000L) & 0xFF) === 218)
+ // This last size is bigger than we can encode in a byte, so check that we just return 255
+ assert((MapStatus.compressSize(1000000000000000000L) & 0xFF) === 255)
+ }
+
+ test("decompressSize") {
+ assert(MapStatus.decompressSize(0) === 0)
+ for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
+ val size2 = MapStatus.decompressSize(MapStatus.compressSize(size))
+ assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
+ "size " + size + " decompressed to " + size2 + ", which is out of range")
+ }
+ }
+
+ test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
+ val sizes = Array.fill[Long](2001)(150L)
+ val status = MapStatus(null, sizes)
+ assert(status.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status.getSizeForBlock(10) === 150L)
+ assert(status.getSizeForBlock(50) === 150L)
+ assert(status.getSizeForBlock(99) === 150L)
+ assert(status.getSizeForBlock(2000) === 150L)
+ }
+
+ test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") {
+ val sizes = Array.tabulate[Long](50) { i => i.toLong }
+ val loc = BlockManagerId("a", "b", 10)
+ val status = MapStatus(loc, sizes)
+ val ser = new JavaSerializer(new SparkConf)
+ val buf = ser.newInstance().serialize(status)
+ val status1 = ser.newInstance().deserialize[MapStatus](buf)
+ assert(status1.location == loc)
+ for (i <- 0 until sizes.length) {
+ // make sure the estimated size is within 10% of the input; note that we skip the very small
+ // sizes because the compression is very lossy there.
+ val estimate = status1.getSizeForBlock(i)
+ if (estimate > 100) {
+ assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i),
+ s"incorrect estimated size $estimate, original was ${sizes(i)}")
+ }
+ }
+ }
+
+ test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") {
+ val sizes = Array.tabulate[Long](3000) { i => i.toLong }
+ val avg = sizes.sum / sizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val status = MapStatus(loc, sizes)
+ val ser = new JavaSerializer(new SparkConf)
+ val buf = ser.newInstance().serialize(status)
+ val status1 = ser.newInstance().deserialize[MapStatus](buf)
+ assert(status1.location == loc)
+ for (i <- 0 until 3000) {
+ val estimate = status1.getSizeForBlock(i)
+ assert(estimate === avg)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 76bf4cfd11..7bca1711ae 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -106,10 +106,9 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0,
+ MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
@@ -157,10 +156,9 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0, MapStatus(
+ BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1adfaa18c6..4076ebc6fc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
+ "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
+ // MapStatus should be private[spark]
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.scheduler.MapStatus")
)
case v if v.startsWith("1.1") =>