aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-29 22:56:22 -0700
committerReynold Xin <rxin@apache.org>2014-09-29 22:56:22 -0700
commit6b79bfb42580b6bd4c4cd99fb521534a94150693 (patch)
treed2cf3ee0b180a83062f529530f2de7e13b6c2391 /core/src/main/scala
parent210404a56197ad347f1e621ed53ef01327fba2bd (diff)
downloadspark-6b79bfb42580b6bd4c4cd99fb521534a94150693.tar.gz
spark-6b79bfb42580b6bd4c4cd99fb521534a94150693.tar.bz2
spark-6b79bfb42580b6bd4c4cd99fb521534a94150693.zip
[SPARK-3613] Record only average block size in MapStatus for large stages
This changes the way we send MapStatus from executors back to driver for large stages (>2000 tasks). For large stages, we no longer send one byte per block. Instead, we just send the average block size. This makes large jobs (tens of thousands of tasks) much more reliable since the driver no longer sends huge amount of data. Author: Reynold Xin <rxin@apache.org> Closes #2470 from rxin/mapstatus and squashes the following commits: 822ff54 [Reynold Xin] Code review feedback. 3b86f56 [Reynold Xin] Added MimaExclude. f89d182 [Reynold Xin] Fixed a bug in MapStatus 6a0401c [Reynold Xin] [SPARK-3613] Record only average block size in MapStatus for large stages.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala119
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala3
4 files changed, 115 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f92189b707..4cb0bd4142 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
}
private[spark] object MapOutputTracker {
- private val LOG_BASE = 1.1
// Serialize an array of map output locations into an efficient byte format so that we can send
// it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
@@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
- (status.location, decompressSize(status.compressedSizes(reduceId)))
+ (status.location, status.getSizeForBlock(reduceId))
}
}
}
-
- /**
- * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
- * We do this by encoding the log base 1.1 of the size as an integer, which can support
- * sizes up to 35 GB with at most 10% error.
- */
- def compressSize(size: Long): Byte = {
- if (size == 0) {
- 0
- } else if (size <= 1L) {
- 1
- } else {
- math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
- }
- }
-
- /**
- * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
- */
- def decompressSize(compressedSize: Byte): Long = {
- if (compressedSize == 0) {
- 0
- } else {
- math.pow(LOG_BASE, compressedSize & 0xFF).toLong
- }
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d3f63ff92a..e25096ea92 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
*/
-private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
- extends Externalizable {
+private[spark] sealed trait MapStatus {
+ /** Location where this task was run. */
+ def location: BlockManagerId
- def this() = this(null, null) // For deserialization only
+ /** Estimated size for the reduce block, in bytes. */
+ def getSizeForBlock(reduceId: Int): Long
+}
+
+
+private[spark] object MapStatus {
+
+ def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
+ if (uncompressedSizes.length > 2000) {
+ new HighlyCompressedMapStatus(loc, uncompressedSizes)
+ } else {
+ new CompressedMapStatus(loc, uncompressedSizes)
+ }
+ }
+
+ private[this] val LOG_BASE = 1.1
+
+ /**
+ * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
+ * We do this by encoding the log base 1.1 of the size as an integer, which can support
+ * sizes up to 35 GB with at most 10% error.
+ */
+ def compressSize(size: Long): Byte = {
+ if (size == 0) {
+ 0
+ } else if (size <= 1L) {
+ 1
+ } else {
+ math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
+ }
+ }
+
+ /**
+ * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
+ */
+ def decompressSize(compressedSize: Byte): Long = {
+ if (compressedSize == 0) {
+ 0
+ } else {
+ math.pow(LOG_BASE, compressedSize & 0xFF).toLong
+ }
+ }
+}
+
+
+/**
+ * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is
+ * represented using a single byte.
+ *
+ * @param loc location where the task is being executed.
+ * @param compressedSizes size of the blocks, indexed by reduce partition id.
+ */
+private[spark] class CompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var compressedSizes: Array[Byte])
+ extends MapStatus with Externalizable {
+
+ protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.map(MapStatus.compressSize))
+ }
- def writeExternal(out: ObjectOutput) {
- location.writeExternal(out)
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = {
+ MapStatus.decompressSize(compressedSizes(reduceId))
+ }
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
- def readExternal(in: ObjectInput) {
- location = BlockManagerId(in)
- compressedSizes = new Array[Byte](in.readInt())
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ val len = in.readInt()
+ compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
}
}
+
+
+/**
+ * A [[MapStatus]] implementation that only stores the average size of the blocks.
+ *
+ * @param loc location where the task is being executed.
+ * @param avgSize average size of all the blocks
+ */
+private[spark] class HighlyCompressedMapStatus(
+ private[this] var loc: BlockManagerId,
+ private[this] var avgSize: Long)
+ extends MapStatus with Externalizable {
+
+ def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+ this(loc, uncompressedSizes.sum / uncompressedSizes.length)
+ }
+
+ protected def this() = this(null, 0L) // For deserialization only
+
+ override def location: BlockManagerId = loc
+
+ override def getSizeForBlock(reduceId: Int): Long = avgSize
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ loc.writeExternal(out)
+ out.writeLong(avgSize)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ loc = BlockManagerId(in)
+ avgSize = in.readLong()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 4b9454d75a..746ed33b54 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V](
private def commitWritesAndBuildStatus(): MapStatus = {
// Commit the writes. Get the size of each bucket block (total block size).
- val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
+ val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commitAndClose()
- val size = writer.fileSegment().length
- MapOutputTracker.compressSize(size)
+ writer.fileSegment().length
}
-
- new MapStatus(blockManager.blockManagerId, compressedSizes)
+ MapStatus(blockManager.blockManagerId, sizes)
}
private def revertWrites(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 89a78d6982..927481b72c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
- mapStatus = new MapStatus(blockManager.blockManagerId,
- partitionLengths.map(MapOutputTracker.compressSize))
+ mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
}
/** Close this writer, passing along whether the map completed */