aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala175
1 files changed, 157 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 9df4e55166..1105167d39 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,9 +19,53 @@ package org.apache.spark.shuffle.sort
import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
+import org.apache.spark._
+import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle._
+/**
+ * In sort-based shuffle, incoming records are sorted according to their target partition ids, then
+ * written to a single map output file. Reducers fetch contiguous regions of this file in order to
+ * read their portion of the map output. In cases where the map output data is too large to fit in
+ * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
+ * to produce the final output file.
+ *
+ * Sort-based shuffle has two different write paths for producing its map output files:
+ *
+ * - Serialized sorting: used when all three of the following conditions hold:
+ * 1. The shuffle dependency specifies no aggregation or output ordering.
+ * 2. The shuffle serializer supports relocation of serialized values (this is currently
+ * supported by KryoSerializer and Spark SQL's custom serializers).
+ * 3. The shuffle produces fewer than 16777216 output partitions.
+ * - Deserialized sorting: used to handle all other cases.
+ *
+ * -----------------------
+ * Serialized sorting mode
+ * -----------------------
+ *
+ * In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
+ * shuffle writer and are buffered in a serialized form during sorting. This write path implements
+ * several optimizations:
+ *
+ * - Its sort operates on serialized binary data rather than Java objects, which reduces memory
+ * consumption and GC overheads. This optimization requires the record serializer to have certain
+ * properties to allow serialized records to be re-ordered without requiring deserialization.
+ * See SPARK-4550, where this optimization was first proposed and implemented, for more details.
+ *
+ * - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
+ * arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
+ * record in the sorting array, this fits more of the array into cache.
+ *
+ * - The spill merging procedure operates on blocks of serialized records that belong to the same
+ * partition and does not need to deserialize records during the merge.
+ *
+ * - When the spill compression codec supports concatenation of compressed data, the spill merge
+ * simply concatenates the serialized and compressed spill partitions to produce the final output
+ * partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
+ * and avoids the need to allocate decompression or copying buffers during the merge.
+ *
+ * For more details on these optimizations, see SPARK-7081.
+ */
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
if (!conf.getBoolean("spark.shuffle.spill", true)) {
@@ -30,8 +74,12 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
" Shuffle will continue to spill to disk when necessary.")
}
- private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
- private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()
+ /**
+ * A mapping from shuffle ids to the number of mappers producing output for those shuffles.
+ */
+ private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
+
+ override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
@@ -40,7 +88,22 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
- new BaseShuffleHandle(shuffleId, numMaps, dependency)
+ if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
+ // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
+ // need map-side aggregation, then write numPartitions files directly and just concatenate
+ // them at the end. This avoids doing serialization and deserialization twice to merge
+ // together the spilled files, which would happen with the normal code path. The downside is
+ // having multiple files open at a time and thus more memory allocated to buffers.
+ new BypassMergeSortShuffleHandle[K, V](
+ shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
+ } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
+ // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
+ new SerializedShuffleHandle[K, V](
+ shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
+ } else {
+ // Otherwise, buffer map outputs in a deserialized form:
+ new BaseShuffleHandle(shuffleId, numMaps, dependency)
+ }
}
/**
@@ -52,38 +115,114 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
- // We currently use the same block store shuffle fetcher as the hash-based shuffle.
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
/** Get a writer for a given partition. Called on executors by map tasks. */
- override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
- : ShuffleWriter[K, V] = {
- val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
- shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
- new SortShuffleWriter(
- shuffleBlockResolver, baseShuffleHandle, mapId, context)
+ override def getWriter[K, V](
+ handle: ShuffleHandle,
+ mapId: Int,
+ context: TaskContext): ShuffleWriter[K, V] = {
+ numMapsForShuffle.putIfAbsent(
+ handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
+ val env = SparkEnv.get
+ handle match {
+ case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
+ new UnsafeShuffleWriter(
+ env.blockManager,
+ shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
+ context.taskMemoryManager(),
+ env.shuffleMemoryManager,
+ unsafeShuffleHandle,
+ mapId,
+ context,
+ env.conf)
+ case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
+ new BypassMergeSortShuffleWriter(
+ env.blockManager,
+ shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
+ bypassMergeSortHandle,
+ mapId,
+ context,
+ env.conf)
+ case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
+ new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
+ }
}
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
- if (shuffleMapNumber.containsKey(shuffleId)) {
- val numMaps = shuffleMapNumber.remove(shuffleId)
- (0 until numMaps).map{ mapId =>
+ Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
+ (0 until numMaps).foreach { mapId =>
shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}
- override val shuffleBlockResolver: IndexShuffleBlockResolver = {
- indexShuffleBlockResolver
- }
-
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockResolver.stop()
}
}
+
+private[spark] object SortShuffleManager extends Logging {
+
+ /**
+ * The maximum number of shuffle output partitions that SortShuffleManager supports when
+ * buffering map outputs in a serialized form. This is an extreme defensive programming measure,
+ * since it's extremely unlikely that a single shuffle produces over 16 million output partitions.
+ * */
+ val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
+ PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
+
+ /**
+ * Helper method for determining whether a shuffle should use an optimized serialized shuffle
+ * path or whether it should fall back to the original path that operates on deserialized objects.
+ */
+ def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
+ val shufId = dependency.shuffleId
+ val numPartitions = dependency.partitioner.numPartitions
+ val serializer = Serializer.getSerializer(dependency.serializer)
+ if (!serializer.supportsRelocationOfSerializedObjects) {
+ log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
+ s"${serializer.getClass.getName}, does not support object relocation")
+ false
+ } else if (dependency.aggregator.isDefined) {
+ log.debug(
+ s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
+ false
+ } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
+ log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
+ s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
+ false
+ } else {
+ log.debug(s"Can use serialized shuffle for shuffle $shufId")
+ true
+ }
+ }
+}
+
+/**
+ * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
+ * serialized shuffle.
+ */
+private[spark] class SerializedShuffleHandle[K, V](
+ shuffleId: Int,
+ numMaps: Int,
+ dependency: ShuffleDependency[K, V, V])
+ extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
+}
+
+/**
+ * Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
+ * bypass merge sort shuffle path.
+ */
+private[spark] class BypassMergeSortShuffleHandle[K, V](
+ shuffleId: Int,
+ numMaps: Int,
+ dependency: ShuffleDependency[K, V, V])
+ extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
+}