aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala66
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala92
-rw-r--r--core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala14
-rw-r--r--project/MimaExcludes.scala5
9 files changed, 240 insertions, 98 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f92189b707..4cb0bd4142 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
private[spark] object MapOutputTracker {
- private val LOG_BASE = 1.1
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
- (status.location, decompressSize(status.compressedSizes(reduceId)))
+ (status.location, status.getSizeForBlock(reduceId))
}
}
}
-
- /**
- * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
- * We do this by encoding the log base 1.1 of the size as an integer, which can support
- * sizes up to 35 GB with at most 10% error.
- */
- def compressSize(size: Long): Byte = {
- if (size == 0) {
- 0
- } else if (size <= 1L) {
- 1
- } else {
- math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
- }
- }
-
- /**
- * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
- */
- def decompressSize(compressedSize: Byte): Long = {
- if (compressedSize == 0) {
- 0
- } else {
- math.pow(LOG_BASE, compressedSize & 0xFF).toLong
- }
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d3f63ff92a..e25096ea92 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
*/
-private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
- extends Externalizable {
+private[spark] sealed trait MapStatus {
+ /** Location where this task was run. */
+ def location: BlockManagerId
- def this() = this(null, null) // For deserialization only
+ /** Estimated size for the reduce block, in bytes. */
+ def getSizeForBlock(reduceId: Int): Long
+}
+
+
+private[spark] object MapStatus {
+
+ def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
+ if (uncompressedSizes.length > 2000) {
+ new HighlyCompressedMapStatus(loc, uncompressedSizes)
+ } else {
+ new CompressedMapStatus(loc, uncompressedSizes)
+ }
+ }
+
+ private[this] val LOG_BASE = 1.1
+
+ /**
+ * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
+ * We do this by encoding the log base 1.1 of the size as an integer, which can support
+ * sizes up to 35 GB with at most 10% error.
+ */
+ def compressSize(size: Long): Byte = {
+ if (size == 0) {
+ 0
+ } else if (size <= 1L) {
+ 1
+ } else {
+ math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
+ }
+ }
+
+ /**
+ * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
+ */
+ def decompressSize(compressedSize: Byte): Long = {
+ if (compressedSize == 0) {
+ 0
+ } else {
+ math.pow(LOG_BASE, compressedSize & 0xFF).toLong
+ }
+ }
+}
+
+
+/**
+ * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is
+ * represented using a single byte.
+ *
+ * @param loc location where the task is being executed.
+ * @param compressedSizes size of the blocks, indexed by reduce partition id.
+ */
+private[spark] class CompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var compressedSizes: Array[Byte])
+ extends MapStatus with Externalizable {
+
+ protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.map(MapStatus.compressSize))
+ }
- def writeExternal(out: ObjectOutput) {
- location.writeExternal(out)
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = {
+ MapStatus.decompressSize(compressedSizes(reduceId))
+ }
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
- def readExternal(in: ObjectInput) {
- location = BlockManagerId(in)
- compressedSizes = new Array[Byte](in.readInt())
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ val len = in.readInt()
+ compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
}
}
+
+
+/**
+ * A [[MapStatus]] implementation that only stores the average size of the blocks.
+ *
+ * @param loc location where the task is being executed.
+ * @param avgSize average size of all the blocks
+ */
+private[spark] class HighlyCompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var avgSize: Long)
+ extends MapStatus with Externalizable {
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.sum / uncompressedSizes.length)
+ }
+
+ protected def this() = this(null, 0L) // For deserialization only
+
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = avgSize
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
+ out.writeLong(avgSize)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ avgSize = in.readLong()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 4b9454d75a..746ed33b54 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V](
private def commitWritesAndBuildStatus(): MapStatus = {
// Commit the writes. Get the size of each bucket block (total block size).
- val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
+ val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commitAndClose()
- val size = writer.fileSegment().length
- MapOutputTracker.compressSize(size)
+ writer.fileSegment().length
}
-
- new MapStatus(blockManager.blockManagerId, compressedSizes)
+ MapStatus(blockManager.blockManagerId, sizes)
}
private def revertWrites(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 89a78d6982..927481b72c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
- mapStatus = new MapStatus(blockManager.blockManagerId,
- partitionLengths.map(MapOutputTracker.compressSize))
+ mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
}
/** Close this writer, passing along whether the map completed */
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 5369169811..1fef79ad10 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -23,32 +23,13 @@ import akka.actor._
import akka.testkit.TestActorRef
import org.scalatest.FunSuite
-import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
private val conf = new SparkConf
- test("compressSize") {
- assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 1)
- assert(MapOutputTracker.compressSize(2L) === 8)
- assert(MapOutputTracker.compressSize(10L) === 25)
- assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
- assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218)
- // This last size is bigger than we can encode in a byte, so check that we just return 255
- assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 255)
- }
-
- test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 0)
- for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
- val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
- assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
- "size " + size + " decompressed to " + size2 + ", which is out of range")
- }
- }
test("master start and stop") {
val actorSystem = ActorSystem("test")
@@ -65,14 +46,12 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
assert(tracker.containsShuffle(10))
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
- Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
- Array(compressedSize10000, compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+ Array(1000L, 10000L)))
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
+ Array(10000L, 1000L)))
val statuses = tracker.getServerStatuses(10, 0)
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000)))
@@ -84,11 +63,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
+ val compressedSize1000 = MapStatus.compressSize(1000L)
+ val compressedSize10000 = MapStatus.compressSize(10000L)
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
assert(tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).nonEmpty)
@@ -103,11 +82,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.registerShuffle(10, 2)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
+ val compressedSize1000 = MapStatus.compressSize(1000L)
+ val compressedSize10000 = MapStatus.compressSize(10000L)
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
+ tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simultaneous fetch failures
@@ -142,10 +121,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0, MapStatus(
+ BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
@@ -173,8 +151,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// Frame size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0)))
+ masterTracker.registerMapOutput(10, 0, MapStatus(
+ BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))
}
@@ -194,8 +172,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// being sent.
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
- masterTracker.registerMapOutput(20, i, new MapStatus(
- BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0)))
+ masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
+ BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index aa73469b6a..a2e4f712db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -740,7 +740,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
private def makeMapStatus(host: String, reduces: Int): MapStatus =
- new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
+ MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
new file mode 100644
index 0000000000..79e04f046e
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.storage.BlockManagerId
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
+
+
+class MapStatusSuite extends FunSuite {
+
+ test("compressSize") {
+ assert(MapStatus.compressSize(0L) === 0)
+ assert(MapStatus.compressSize(1L) === 1)
+ assert(MapStatus.compressSize(2L) === 8)
+ assert(MapStatus.compressSize(10L) === 25)
+ assert((MapStatus.compressSize(1000000L) & 0xFF) === 145)
+ assert((MapStatus.compressSize(1000000000L) & 0xFF) === 218)
+ // This last size is bigger than we can encode in a byte, so check that we just return 255
+ assert((MapStatus.compressSize(1000000000000000000L) & 0xFF) === 255)
+ }
+
+ test("decompressSize") {
+ assert(MapStatus.decompressSize(0) === 0)
+ for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
+ val size2 = MapStatus.decompressSize(MapStatus.compressSize(size))
+ assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
+ "size " + size + " decompressed to " + size2 + ", which is out of range")
+ }
+ }
+
+ test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
+ val sizes = Array.fill[Long](2001)(150L)
+ val status = MapStatus(null, sizes)
+ assert(status.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status.getSizeForBlock(10) === 150L)
+ assert(status.getSizeForBlock(50) === 150L)
+ assert(status.getSizeForBlock(99) === 150L)
+ assert(status.getSizeForBlock(2000) === 150L)
+ }
+
+ test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is within 10%") {
+ val sizes = Array.tabulate[Long](50) { i => i.toLong }
+ val loc = BlockManagerId("a", "b", 10)
+ val status = MapStatus(loc, sizes)
+ val ser = new JavaSerializer(new SparkConf)
+ val buf = ser.newInstance().serialize(status)
+ val status1 = ser.newInstance().deserialize[MapStatus](buf)
+ assert(status1.location == loc)
+ for (i <- 0 until sizes.length) {
+ // make sure the estimated size is within 10% of the input; note that we skip the very small
+ // sizes because the compression is very lossy there.
+ val estimate = status1.getSizeForBlock(i)
+ if (estimate > 100) {
+ assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i),
+ s"incorrect estimated size $estimate, original was ${sizes(i)}")
+ }
+ }
+ }
+
+ test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should be the average size") {
+ val sizes = Array.tabulate[Long](3000) { i => i.toLong }
+ val avg = sizes.sum / sizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val status = MapStatus(loc, sizes)
+ val ser = new JavaSerializer(new SparkConf)
+ val buf = ser.newInstance().serialize(status)
+ val status1 = ser.newInstance().deserialize[MapStatus](buf)
+ assert(status1.location == loc)
+ for (i <- 0 until 3000) {
+ val estimate = status1.getSizeForBlock(i)
+ assert(estimate === avg)
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 76bf4cfd11..7bca1711ae 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -106,10 +106,9 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0,
+ MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
@@ -157,10 +156,9 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
- val compressedSize1000 = MapOutputTracker.compressSize(1000L)
- val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
- masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+ val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+ masterTracker.registerMapOutput(10, 0, MapStatus(
+ BlockManagerId("a", "hostA", 1000), Array(1000L)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1adfaa18c6..4076ebc6fc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
+ "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
+ // MapStatus should be private[spark]
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.scheduler.MapStatus")
)
case v if v.startsWith("1.1") =>