diff options
Diffstat (limited to 'core/src/main')
4 files changed, 115 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f92189b707..4cb0bd4142 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } private[spark] object MapOutputTracker { - private val LOG_BASE = 1.1 // Serialize an array of map output locations into an efficient byte format so that we can send // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will @@ -385,34 +384,8 @@ private[spark] object MapOutputTracker { throw new MetadataFetchFailedException( shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId) } else { - (status.location, decompressSize(status.compressedSizes(reduceId))) + (status.location, status.getSizeForBlock(reduceId)) } } } - - /** - * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. - * We do this by encoding the log base 1.1 of the size as an integer, which can support - * sizes up to 35 GB with at most 10% error. - */ - def compressSize(size: Long): Byte = { - if (size == 0) { - 0 - } else if (size <= 1L) { - 1 - } else { - math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte - } - } - - /** - * Decompress an 8-bit encoded block size, using the reverse operation of compressSize. - */ - def decompressSize(compressedSize: Byte): Long = { - if (compressedSize == 0) { - 0 - } else { - math.pow(LOG_BASE, compressedSize & 0xFF).toLong - } - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index d3f63ff92a..e25096ea92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId /** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. - * The map output sizes are compressed using MapOutputTracker.compressSize. */ -private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte]) - extends Externalizable { +private[spark] sealed trait MapStatus { + /** Location where this task was run. */ + def location: BlockManagerId - def this() = this(null, null) // For deserialization only + /** Estimated size for the reduce block, in bytes. */ + def getSizeForBlock(reduceId: Int): Long +} + + +private[spark] object MapStatus { + + def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { + if (uncompressedSizes.length > 2000) { + new HighlyCompressedMapStatus(loc, uncompressedSizes) + } else { + new CompressedMapStatus(loc, uncompressedSizes) + } + } + + private[this] val LOG_BASE = 1.1 + + /** + * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. + * We do this by encoding the log base 1.1 of the size as an integer, which can support + * sizes up to 35 GB with at most 10% error. + */ + def compressSize(size: Long): Byte = { + if (size == 0) { + 0 + } else if (size <= 1L) { + 1 + } else { + math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte + } + } + + /** + * Decompress an 8-bit encoded block size, using the reverse operation of compressSize. + */ + def decompressSize(compressedSize: Byte): Long = { + if (compressedSize == 0) { + 0 + } else { + math.pow(LOG_BASE, compressedSize & 0xFF).toLong + } + } +} + + +/** + * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is + * represented using a single byte. + * + * @param loc location where the task is being executed. + * @param compressedSizes size of the blocks, indexed by reduce partition id. + */ +private[spark] class CompressedMapStatus( + private[this] var loc: BlockManagerId, + private[this] var compressedSizes: Array[Byte]) + extends MapStatus with Externalizable { + + protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only + + def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { + this(loc, uncompressedSizes.map(MapStatus.compressSize)) + } - def writeExternal(out: ObjectOutput) { - location.writeExternal(out) + override def location: BlockManagerId = loc + + override def getSizeForBlock(reduceId: Int): Long = { + MapStatus.decompressSize(compressedSizes(reduceId)) + } + + override def writeExternal(out: ObjectOutput): Unit = { + loc.writeExternal(out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } - def readExternal(in: ObjectInput) { - location = BlockManagerId(in) - compressedSizes = new Array[Byte](in.readInt()) + override def readExternal(in: ObjectInput): Unit = { + loc = BlockManagerId(in) + val len = in.readInt() + compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) } } + + +/** + * A [[MapStatus]] implementation that only stores the average size of the blocks. + * + * @param loc location where the task is being executed. + * @param avgSize average size of all the blocks + */ +private[spark] class HighlyCompressedMapStatus( + private[this] var loc: BlockManagerId, + private[this] var avgSize: Long) + extends MapStatus with Externalizable { + + def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { + this(loc, uncompressedSizes.sum / uncompressedSizes.length) + } + + protected def this() = this(null, 0L) // For deserialization only + + override def location: BlockManagerId = loc + + override def getSizeForBlock(reduceId: Int): Long = avgSize + + override def writeExternal(out: ObjectOutput): Unit = { + loc.writeExternal(out) + out.writeLong(avgSize) + } + + override def readExternal(in: ObjectInput): Unit = { + loc = BlockManagerId(in) + avgSize = in.readLong() + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index 4b9454d75a..746ed33b54 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V]( private def commitWritesAndBuildStatus(): MapStatus = { // Commit the writes. Get the size of each bucket block (total block size). - val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter => + val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter => writer.commitAndClose() - val size = writer.fileSegment().length - MapOutputTracker.compressSize(size) + writer.fileSegment().length } - - new MapStatus(blockManager.blockManagerId, compressedSizes) + MapStatus(blockManager.blockManagerId, sizes) } private def revertWrites(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 89a78d6982..927481b72c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C]( val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) - mapStatus = new MapStatus(blockManager.blockManagerId, - partitionLengths.map(MapOutputTracker.compressSize)) + mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths) } /** Close this writer, passing along whether the map completed */ |