aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-05-08 22:09:55 -0400
committerYin Huai <yhuai@databricks.com>2015-05-08 22:09:55 -0400
commitcde5483884068b0ae1470b9b9b3ee54ab944ab12 (patch)
tree28d7d3f6cd5da3ae79e3fd1f0a53824775c03f51 /sql/core
parent0a901dd3a1eb3fd459d45b771ce4ad2cfef2a944 (diff)
downloadspark-cde5483884068b0ae1470b9b9b3ee54ab944ab12.tar.gz
spark-cde5483884068b0ae1470b9b9b3ee54ab944ab12.tar.bz2
spark-cde5483884068b0ae1470b9b9b3ee54ab944ab12.zip
[SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect
This patch refactors the SQL `Exchange` operator's logic for determining whether map outputs need to be copied before being shuffled. As part of this change, we'll now avoid unnecessary copies in cases where sort-based shuffle operates on serialized map outputs (as in #4450 / SPARK-4550). This patch also includes a change to copy the input to RangePartitioner partition bounds calculation, which is necessary because this calculation buffers mutable Java objects. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5948) <!-- Reviewable:end --> Author: Josh Rosen <joshrosen@databricks.com> Closes #5948 from JoshRosen/SPARK-7375 and squashes the following commits: f305ff3 [Josh Rosen] Reduce scope of some variables in Exchange 899e1d7 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-7375 6a6bfce [Josh Rosen] Fix issue related to RangePartitioning: ad006a4 [Josh Rosen] [SPARK-7375] Avoid defensive copying in exchange operator when sort.serializeMapOutputs takes effect.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala156
1 files changed, 100 insertions, 56 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f02fa81e95..c3d2c7019a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner}
+import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.{SQLContext, Row}
@@ -59,11 +59,62 @@ case class Exchange(
override def output: Seq[Attribute] = child.output
- /** We must copy rows when sort based shuffle is on */
- protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-
- private val bypassMergeThreshold =
- child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+ /**
+ * Determines whether records must be defensively copied before being sent to the shuffle.
+ * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
+ * shuffle code assumes that objects are immutable and hence does not perform its own defensive
+ * copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
+ * order to properly shuffle the output of these operators, we need to perform our own copying
+ * prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
+ * whenever possible. This method encapsulates the logic for choosing when to copy.
+ *
+ * In the long run, we might want to push this logic into core's shuffle APIs so that we don't
+ * have to rely on knowledge of core internals here in SQL.
+ *
+ * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
+ *
+ * @param partitioner the partitioner for the shuffle
+ * @param serializer the serializer that will be used to write rows
+ * @return true if rows should be copied before being shuffled, false otherwise
+ */
+ private def needToCopyObjectsBeforeShuffle(
+ partitioner: Partitioner,
+ serializer: Serializer): Boolean = {
+ // Note: even though we only use the partitioner's `numPartitions` field, we require it to be
+ // passed instead of directly passing the number of partitions in order to guard against
+ // corner-cases where a partitioner constructed with `numPartitions` partitions may output
+ // fewer partitions (like RangePartitioner, for example).
+ val conf = child.sqlContext.sparkContext.conf
+ val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
+ val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+ val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
+ if (newOrdering.nonEmpty) {
+ // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
+ // which requires a defensive copy.
+ true
+ } else if (sortBasedShuffleOn) {
+ // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
+ // However, there are two special cases where we can avoid the copy, described below:
+ if (partitioner.numPartitions <= bypassMergeThreshold) {
+ // If the number of output partitions is sufficiently small, then Spark will fall back to
+ // the old hash-based shuffle write path which doesn't buffer deserialized records.
+ // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
+ false
+ } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
+ // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
+ // them. This optimization is guarded by a feature-flag and is only applied in cases where
+ // shuffle dependency does not specify an ordering and the record serializer has certain
+ // properties. If this optimization is enabled, we can safely avoid the copy.
+ false
+ } else {
+ // None of the special cases held, so we must copy.
+ true
+ }
+ } else {
+ // We're using hash-based shuffle, so we don't need to copy.
+ false
+ }
+ }
private val keyOrdering = {
if (newOrdering.nonEmpty) {
@@ -81,7 +132,7 @@ case class Exchange(
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
- def serializer(
+ private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
hasKeyOrdering: Boolean,
@@ -112,17 +163,12 @@ case class Exchange(
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
- // TODO: Eliminate redundant expressions in grouping key and value.
- // This is a workaround for SPARK-4479. When:
- // 1. sort based shuffle is on, and
- // 2. the partition number is under the merge threshold, and
- // 3. no ordering is required
- // we can avoid the defensive copies to improve performance. In the long run, we probably
- // want to include information in shuffle dependencies to indicate whether elements in the
- // source RDD should be copied.
- val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold
-
- val rdd = if (willMergeSort || newOrdering.nonEmpty) {
+ val keySchema = expressions.map(_.dataType).toArray
+ val valueSchema = child.output.map(_.dataType).toArray
+ val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
+ val part = new HashPartitioner(numPartitions)
+
+ val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
iter.map(r => (hashExpressions(r).copy(), r.copy()))
@@ -134,52 +180,52 @@ case class Exchange(
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
}
- val part = new HashPartitioner(numPartitions)
- val shuffled =
- if (newOrdering.nonEmpty) {
- new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering)
- } else {
- new ShuffledRDD[Row, Row, Row](rdd, part)
- }
- val keySchema = expressions.map(_.dataType).toArray
- val valueSchema = child.output.map(_.dataType).toArray
- shuffled.setSerializer(
- serializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions))
-
+ val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
+ if (newOrdering.nonEmpty) {
+ shuffled.setKeyOrdering(keyOrdering)
+ }
+ shuffled.setSerializer(serializer)
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
- val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) {
- child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
- } else {
- child.execute().mapPartitions { iter =>
- val mutablePair = new MutablePair[Row, Null](null, null)
- iter.map(row => mutablePair.update(row, null))
+ val keySchema = child.output.map(_.dataType).toArray
+ val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)
+
+ val childRdd = child.execute()
+ val part: Partitioner = {
+ // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
+ // partition bounds. To get accurate samples, we need to copy the mutable keys.
+ val rddForSampling = childRdd.mapPartitions { iter =>
+ val mutablePair = new MutablePair[Row, Null]()
+ iter.map(row => mutablePair.update(row.copy(), null))
}
+ // TODO: RangePartitioner should take an Ordering.
+ implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+ new RangePartitioner(numPartitions, rddForSampling, ascending = true)
}
- // TODO: RangePartitioner should take an Ordering.
- implicit val ordering = new RowOrdering(sortingExpressions, child.output)
-
- val part = new RangePartitioner(numPartitions, rdd, ascending = true)
- val shuffled =
- if (newOrdering.nonEmpty) {
- new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering)
- } else {
- new ShuffledRDD[Row, Null, Null](rdd, part)
+ val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
+ childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))}
+ } else {
+ childRdd.mapPartitions { iter =>
+ val mutablePair = new MutablePair[Row, Null]()
+ iter.map(row => mutablePair.update(row, null))
}
- val keySchema = child.output.map(_.dataType).toArray
- shuffled.setSerializer(
- serializer(keySchema, null, newOrdering.nonEmpty, numPartitions))
+ }
+ val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
+ if (newOrdering.nonEmpty) {
+ shuffled.setKeyOrdering(keyOrdering)
+ }
+ shuffled.setSerializer(serializer)
shuffled.map(_._1)
case SinglePartition =>
- // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
- // operators like `TakeOrdered` may require an ordering within the partition, and currently
- // `SinglePartition` doesn't include ordering information.
- // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
- val rdd = if (sortBasedShuffleOn) {
+ val valueSchema = child.output.map(_.dataType).toArray
+ val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
+ val partitioner = new HashPartitioner(1)
+
+ val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
} else {
child.execute().mapPartitions { iter =>
@@ -187,10 +233,8 @@ case class Exchange(
iter.map(r => mutablePair.update(null, r))
}
}
- val partitioner = new HashPartitioner(1)
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
- val valueSchema = child.output.map(_.dataType).toArray
- shuffled.setSerializer(serializer(null, valueSchema, false, 1))
+ shuffled.setSerializer(serializer)
shuffled.map(_._2)
case _ => sys.error(s"Exchange not implemented for $newPartitioning")