aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-06-11 22:15:15 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-11 22:15:15 -0700
commitb9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1 (patch)
tree55a44418f5f434521aa7f2d43c36bed52758c163 /sql
parent767cc94ca6d397ba19226996ccb3c8e57083c549 (diff)
downloadspark-b9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1.tar.gz
spark-b9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1.tar.bz2
spark-b9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1.zip
[SPARK-8317] [SQL] Do not push sort into shuffle in Exchange operator
In some cases, Spark SQL pushes sorting operations into the shuffle layer by specifying a key ordering as part of the shuffle dependency. I think that we should not do this: - Since we do not delegate aggregation to Spark's shuffle, specifying the keyOrdering as part of the shuffle has no effect on the shuffle map side. - By performing the shuffle ourselves (by inserting a sort operator after the shuffle instead), we can use the Exchange planner to choose specialized sorting implementations based on the types of rows being sorted. - We can remove some complexity from SqlSerializer2 by not requiring it to know about sort orderings, since SQL's own sort operators will already perform the necessary defensive copying. This patch removes Exchange's `canSortWithShuffle` path and the associated code in `SqlSerializer2`. Shuffles that used to go through the `canSortWithShuffle` path would always wind up using Spark's `ExternalSorter` (inside of `HashShuffleReader`); to avoid a performance regression as a result of handling these shuffles ourselves, I've changed the SQLConf defaults so that external sorting is enabled by default. Author: Josh Rosen <joshrosen@databricks.com> Closes #6772 from JoshRosen/SPARK-8317 and squashes the following commits: ebf9c0f [Josh Rosen] Do not push sort into shuffle in Exchange operator bf3b4c8 [Josh Rosen] Enable external sort by default
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala54
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala22
3 files changed, 24 insertions, 54 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index be786f9b7f..87f40482e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -161,7 +161,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
/** When true the planner will use the external sort, which may spill to disk. */
- private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
+ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean
/**
* Sort merge join would sort the two side of join first, and then iterate both sides together
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f25d10fec0..6fa7ccc6cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -31,16 +31,6 @@ import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.util.MutablePair
-object Exchange {
- /**
- * Returns true when the ordering expressions are a subset of the key.
- * if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]].
- */
- def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = {
- desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet)
- }
-}
-
/**
* :: DeveloperApi ::
* Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each
@@ -143,7 +133,6 @@ case class Exchange(
private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
- hasKeyOrdering: Boolean,
numPartitions: Int): Serializer = {
// It is true when there is no field that needs to be write out.
// For now, we will not use SparkSqlSerializer2 when noField is true.
@@ -159,7 +148,7 @@ case class Exchange(
val serializer = if (useSqlSerializer2) {
logInfo("Using SparkSqlSerializer2.")
- new SparkSqlSerializer2(keySchema, valueSchema, hasKeyOrdering)
+ new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Using SparkSqlSerializer.")
new SparkSqlSerializer(sparkConf)
@@ -173,7 +162,7 @@ case class Exchange(
case HashPartitioning(expressions, numPartitions) =>
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
- val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
+ val serializer = getSerializer(keySchema, valueSchema, numPartitions)
val part = new HashPartitioner(numPartitions)
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
@@ -189,15 +178,12 @@ case class Exchange(
}
}
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
- if (newOrdering.nonEmpty) {
- shuffled.setKeyOrdering(keyOrdering)
- }
shuffled.setSerializer(serializer)
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
val keySchema = child.output.map(_.dataType).toArray
- val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)
+ val serializer = getSerializer(keySchema, null, numPartitions)
val childRdd = child.execute()
val part: Partitioner = {
@@ -222,15 +208,12 @@ case class Exchange(
}
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
- if (newOrdering.nonEmpty) {
- shuffled.setKeyOrdering(keyOrdering)
- }
shuffled.setSerializer(serializer)
shuffled.map(_._1)
case SinglePartition =>
val valueSchema = child.output.map(_.dataType).toArray
- val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
+ val serializer = getSerializer(null, valueSchema, numPartitions = 1)
val partitioner = new HashPartitioner(1)
val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
@@ -306,29 +289,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
child: SparkPlan): SparkPlan = {
val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
val needsShuffle = child.outputPartitioning != partitioning
- val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering)
- if (needSort && needsShuffle && canSortWithShuffle) {
- Exchange(partitioning, rowOrdering, child)
+ val withShuffle = if (needsShuffle) {
+ Exchange(partitioning, Nil, child)
} else {
- val withShuffle = if (needsShuffle) {
- Exchange(partitioning, Nil, child)
- } else {
- child
- }
+ child
+ }
- val withSort = if (needSort) {
- if (sqlContext.conf.externalSortEnabled) {
- ExternalSort(rowOrdering, global = false, withShuffle)
- } else {
- Sort(rowOrdering, global = false, withShuffle)
- }
+ val withSort = if (needSort) {
+ if (sqlContext.conf.externalSortEnabled) {
+ ExternalSort(rowOrdering, global = false, withShuffle)
} else {
- withShuffle
+ Sort(rowOrdering, global = false, withShuffle)
}
-
- withSort
+ } else {
+ withShuffle
}
+
+ withSort
}
if (meetsRequirements && compatible && !needsAnySort) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 202e4488a6..15b6936acd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -86,7 +86,6 @@ private[sql] class Serializer2SerializationStream(
private[sql] class Serializer2DeserializationStream(
keySchema: Array[DataType],
valueSchema: Array[DataType],
- hasKeyOrdering: Boolean,
in: InputStream)
extends DeserializationStream with Logging {
@@ -96,14 +95,9 @@ private[sql] class Serializer2DeserializationStream(
if (schema == null) {
() => null
} else {
- if (hasKeyOrdering) {
- // We have key ordering specified in a ShuffledRDD, it is not safe to reuse a mutable row.
- () => new GenericMutableRow(schema.length)
- } else {
- // It is safe to reuse the mutable row.
- val mutableRow = new SpecificMutableRow(schema)
- () => mutableRow
- }
+ // It is safe to reuse the mutable row.
+ val mutableRow = new SpecificMutableRow(schema)
+ () => mutableRow
}
}
@@ -133,8 +127,7 @@ private[sql] class Serializer2DeserializationStream(
private[sql] class SparkSqlSerializer2Instance(
keySchema: Array[DataType],
- valueSchema: Array[DataType],
- hasKeyOrdering: Boolean)
+ valueSchema: Array[DataType])
extends SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer =
@@ -151,7 +144,7 @@ private[sql] class SparkSqlSerializer2Instance(
}
def deserializeStream(s: InputStream): DeserializationStream = {
- new Serializer2DeserializationStream(keySchema, valueSchema, hasKeyOrdering, s)
+ new Serializer2DeserializationStream(keySchema, valueSchema, s)
}
}
@@ -164,14 +157,13 @@ private[sql] class SparkSqlSerializer2Instance(
*/
private[sql] class SparkSqlSerializer2(
keySchema: Array[DataType],
- valueSchema: Array[DataType],
- hasKeyOrdering: Boolean)
+ valueSchema: Array[DataType])
extends Serializer
with Logging
with Serializable{
def newInstance(): SerializerInstance =
- new SparkSqlSerializer2Instance(keySchema, valueSchema, hasKeyOrdering)
+ new SparkSqlSerializer2Instance(keySchema, valueSchema)
override def supportsRelocationOfSerializedObjects: Boolean = {
// SparkSqlSerializer2 is stateless and writes no stream headers