From cde5483884068b0ae1470b9b9b3ee54ab944ab12 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 8 May 2015 22:09:55 -0400 Subject: [SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. [Review on Reviewable](https://reviewable.io/reviews/apache/spark/5948) Author: Josh Rosen Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect. --- .../org/apache/spark/sql/execution/Exchange.scala | 156 +++++++++++++-------- 1 file changed, 100 insertions(+), 56 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index f02fa81e95..c3d2c7019a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner} +import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.serializer.Serializer import org.apache.spark.sql.{SQLContext, Row} @@ -59,11 +59,62 @@ case class Exchange( override def output: Seq[Attribute] = child.output - /** We must copy rows when sort based shuffle is on */ - protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - - private val bypassMergeThreshold = - child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + /** + * Determines whether records must be defensively copied before being sent to the shuffle. + * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The + * shuffle code assumes that objects are immutable and hence does not perform its own defensive + * copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In + * order to properly shuffle the output of these operators, we need to perform our own copying + * prior to sending records to the shuffle. This copying is expensive, so we try to avoid it + * whenever possible. This method encapsulates the logic for choosing when to copy. + * + * In the long run, we might want to push this logic into core's shuffle APIs so that we don't + * have to rely on knowledge of core internals here in SQL. + * + * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue. + * + * @param partitioner the partitioner for the shuffle + * @param serializer the serializer that will be used to write rows + * @return true if rows should be copied before being shuffled, false otherwise + */ + private def needToCopyObjectsBeforeShuffle( + partitioner: Partitioner, + serializer: Serializer): Boolean = { + // Note: even though we only use the partitioner's `numPartitions` field, we require it to be + // passed instead of directly passing the number of partitions in order to guard against + // corner-cases where a partitioner constructed with `numPartitions` partitions may output + // fewer partitions (like RangePartitioner, for example). + val conf = child.sqlContext.sparkContext.conf + val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) + if (newOrdering.nonEmpty) { + // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`, + // which requires a defensive copy. + true + } else if (sortBasedShuffleOn) { + // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory. + // However, there are two special cases where we can avoid the copy, described below: + if (partitioner.numPartitions <= bypassMergeThreshold) { + // If the number of output partitions is sufficiently small, then Spark will fall back to + // the old hash-based shuffle write path which doesn't buffer deserialized records. + // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. + false + } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { + // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting + // them. This optimization is guarded by a feature-flag and is only applied in cases where + // shuffle dependency does not specify an ordering and the record serializer has certain + // properties. If this optimization is enabled, we can safely avoid the copy. + false + } else { + // None of the special cases held, so we must copy. + true + } + } else { + // We're using hash-based shuffle, so we don't need to copy. + false + } + } private val keyOrdering = { if (newOrdering.nonEmpty) { @@ -81,7 +132,7 @@ case class Exchange( @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf - def serializer( + private def getSerializer( keySchema: Array[DataType], valueSchema: Array[DataType], hasKeyOrdering: Boolean, @@ -112,17 +163,12 @@ case class Exchange( protected override def doExecute(): RDD[Row] = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => - // TODO: Eliminate redundant expressions in grouping key and value. - // This is a workaround for SPARK-4479. When: - // 1. sort based shuffle is on, and - // 2. the partition number is under the merge threshold, and - // 3. no ordering is required - // we can avoid the defensive copies to improve performance. In the long run, we probably - // want to include information in shuffle dependencies to indicate whether elements in the - // source RDD should be copied. - val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold - - val rdd = if (willMergeSort || newOrdering.nonEmpty) { + val keySchema = expressions.map(_.dataType).toArray + val valueSchema = child.output.map(_.dataType).toArray + val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions) + val part = new HashPartitioner(numPartitions) + + val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -134,52 +180,52 @@ case class Exchange( iter.map(r => mutablePair.update(hashExpressions(r), r)) } } - val part = new HashPartitioner(numPartitions) - val shuffled = - if (newOrdering.nonEmpty) { - new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering) - } else { - new ShuffledRDD[Row, Row, Row](rdd, part) - } - val keySchema = expressions.map(_.dataType).toArray - val valueSchema = child.output.map(_.dataType).toArray - shuffled.setSerializer( - serializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)) - + val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) + if (newOrdering.nonEmpty) { + shuffled.setKeyOrdering(keyOrdering) + } + shuffled.setSerializer(serializer) shuffled.map(_._2) case RangePartitioning(sortingExpressions, numPartitions) => - val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) { - child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} - } else { - child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Row, Null](null, null) - iter.map(row => mutablePair.update(row, null)) + val keySchema = child.output.map(_.dataType).toArray + val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions) + + val childRdd = child.execute() + val part: Partitioner = { + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + val rddForSampling = childRdd.mapPartitions { iter => + val mutablePair = new MutablePair[Row, Null]() + iter.map(row => mutablePair.update(row.copy(), null)) } + // TODO: RangePartitioner should take an Ordering. + implicit val ordering = new RowOrdering(sortingExpressions, child.output) + new RangePartitioner(numPartitions, rddForSampling, ascending = true) } - // TODO: RangePartitioner should take an Ordering. - implicit val ordering = new RowOrdering(sortingExpressions, child.output) - - val part = new RangePartitioner(numPartitions, rdd, ascending = true) - val shuffled = - if (newOrdering.nonEmpty) { - new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering) - } else { - new ShuffledRDD[Row, Null, Null](rdd, part) + val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { + childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))} + } else { + childRdd.mapPartitions { iter => + val mutablePair = new MutablePair[Row, Null]() + iter.map(row => mutablePair.update(row, null)) } - val keySchema = child.output.map(_.dataType).toArray - shuffled.setSerializer( - serializer(keySchema, null, newOrdering.nonEmpty, numPartitions)) + } + val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) + if (newOrdering.nonEmpty) { + shuffled.setKeyOrdering(keyOrdering) + } + shuffled.setSerializer(serializer) shuffled.map(_._1) case SinglePartition => - // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since - // operators like `TakeOrdered` may require an ordering within the partition, and currently - // `SinglePartition` doesn't include ordering information. - // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` - val rdd = if (sortBasedShuffleOn) { + val valueSchema = child.output.map(_.dataType).toArray + val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1) + val partitioner = new HashPartitioner(1) + + val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { child.execute().mapPartitions { iter => @@ -187,10 +233,8 @@ case class Exchange( iter.map(r => mutablePair.update(null, r)) } } - val partitioner = new HashPartitioner(1) val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) - val valueSchema = child.output.map(_.dataType).toArray - shuffled.setSerializer(serializer(null, valueSchema, false, 1)) + shuffled.setSerializer(serializer) shuffled.map(_._2) case _ => sys.error(s"Exchange not implemented for $newPartitioning") -- cgit v1.2.3