diff options
3 files changed, 24 insertions, 54 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index be786f9b7f..87f40482e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -161,7 +161,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean /** When true the planner will use the external sort, which may spill to disk. */ - private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean /** * Sort merge join would sort the two side of join first, and then iterate both sides together diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index f25d10fec0..6fa7ccc6cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -31,16 +31,6 @@ import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.util.MutablePair -object Exchange { - /** - * Returns true when the ordering expressions are a subset of the key. - * if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]]. - */ - def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = { - desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet) - } -} - /** * :: DeveloperApi :: * Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each @@ -143,7 +133,6 @@ case class Exchange( private def getSerializer( keySchema: Array[DataType], valueSchema: Array[DataType], - hasKeyOrdering: Boolean, numPartitions: Int): Serializer = { // It is true when there is no field that needs to be write out. // For now, we will not use SparkSqlSerializer2 when noField is true. @@ -159,7 +148,7 @@ case class Exchange( val serializer = if (useSqlSerializer2) { logInfo("Using SparkSqlSerializer2.") - new SparkSqlSerializer2(keySchema, valueSchema, hasKeyOrdering) + new SparkSqlSerializer2(keySchema, valueSchema) } else { logInfo("Using SparkSqlSerializer.") new SparkSqlSerializer(sparkConf) @@ -173,7 +162,7 @@ case class Exchange( case HashPartitioning(expressions, numPartitions) => val keySchema = expressions.map(_.dataType).toArray val valueSchema = child.output.map(_.dataType).toArray - val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions) + val serializer = getSerializer(keySchema, valueSchema, numPartitions) val part = new HashPartitioner(numPartitions) val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { @@ -189,15 +178,12 @@ case class Exchange( } } val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) - if (newOrdering.nonEmpty) { - shuffled.setKeyOrdering(keyOrdering) - } shuffled.setSerializer(serializer) shuffled.map(_._2) case RangePartitioning(sortingExpressions, numPartitions) => val keySchema = child.output.map(_.dataType).toArray - val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions) + val serializer = getSerializer(keySchema, null, numPartitions) val childRdd = child.execute() val part: Partitioner = { @@ -222,15 +208,12 @@ case class Exchange( } val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) - if (newOrdering.nonEmpty) { - shuffled.setKeyOrdering(keyOrdering) - } shuffled.setSerializer(serializer) shuffled.map(_._1) case SinglePartition => val valueSchema = child.output.map(_.dataType).toArray - val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1) + val serializer = getSerializer(null, valueSchema, numPartitions = 1) val partitioner = new HashPartitioner(1) val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) { @@ -306,29 +289,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ child: SparkPlan): SparkPlan = { val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering val needsShuffle = child.outputPartitioning != partitioning - val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering) - if (needSort && needsShuffle && canSortWithShuffle) { - Exchange(partitioning, rowOrdering, child) + val withShuffle = if (needsShuffle) { + Exchange(partitioning, Nil, child) } else { - val withShuffle = if (needsShuffle) { - Exchange(partitioning, Nil, child) - } else { - child - } + child + } - val withSort = if (needSort) { - if (sqlContext.conf.externalSortEnabled) { - ExternalSort(rowOrdering, global = false, withShuffle) - } else { - Sort(rowOrdering, global = false, withShuffle) - } + val withSort = if (needSort) { + if (sqlContext.conf.externalSortEnabled) { + ExternalSort(rowOrdering, global = false, withShuffle) } else { - withShuffle + Sort(rowOrdering, global = false, withShuffle) } - - withSort + } else { + withShuffle } + + withSort } if (meetsRequirements && compatible && !needsAnySort) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index 202e4488a6..15b6936acd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -86,7 +86,6 @@ private[sql] class Serializer2SerializationStream( private[sql] class Serializer2DeserializationStream( keySchema: Array[DataType], valueSchema: Array[DataType], - hasKeyOrdering: Boolean, in: InputStream) extends DeserializationStream with Logging { @@ -96,14 +95,9 @@ private[sql] class Serializer2DeserializationStream( if (schema == null) { () => null } else { - if (hasKeyOrdering) { - // We have key ordering specified in a ShuffledRDD, it is not safe to reuse a mutable row. - () => new GenericMutableRow(schema.length) - } else { - // It is safe to reuse the mutable row. - val mutableRow = new SpecificMutableRow(schema) - () => mutableRow - } + // It is safe to reuse the mutable row. + val mutableRow = new SpecificMutableRow(schema) + () => mutableRow } } @@ -133,8 +127,7 @@ private[sql] class Serializer2DeserializationStream( private[sql] class SparkSqlSerializer2Instance( keySchema: Array[DataType], - valueSchema: Array[DataType], - hasKeyOrdering: Boolean) + valueSchema: Array[DataType]) extends SerializerInstance { def serialize[T: ClassTag](t: T): ByteBuffer = @@ -151,7 +144,7 @@ private[sql] class SparkSqlSerializer2Instance( } def deserializeStream(s: InputStream): DeserializationStream = { - new Serializer2DeserializationStream(keySchema, valueSchema, hasKeyOrdering, s) + new Serializer2DeserializationStream(keySchema, valueSchema, s) } } @@ -164,14 +157,13 @@ private[sql] class SparkSqlSerializer2Instance( */ private[sql] class SparkSqlSerializer2( keySchema: Array[DataType], - valueSchema: Array[DataType], - hasKeyOrdering: Boolean) + valueSchema: Array[DataType]) extends Serializer with Logging with Serializable{ def newInstance(): SerializerInstance = - new SparkSqlSerializer2Instance(keySchema, valueSchema, hasKeyOrdering) + new SparkSqlSerializer2Instance(keySchema, valueSchema) override def supportsRelocationOfSerializedObjects: Boolean = { // SparkSqlSerializer2 is stateless and writes no stream headers |