aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala54
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala22
3 files changed, 24 insertions, 54 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index be786f9b7f..87f40482e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -161,7 +161,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
/** When true the planner will use the external sort, which may spill to disk. */
- private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
+ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean
/**
* Sort merge join would sort the two side of join first, and then iterate both sides together
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index f25d10fec0..6fa7ccc6cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -31,16 +31,6 @@ import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.util.MutablePair
-object Exchange {
- /**
- * Returns true when the ordering expressions are a subset of the key.
- * if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]].
- */
- def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = {
- desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet)
- }
-}
-
/**
* :: DeveloperApi ::
* Performs a shuffle that will result in the desired `newPartitioning`. Optionally sorts each
@@ -143,7 +133,6 @@ case class Exchange(
private def getSerializer(
keySchema: Array[DataType],
valueSchema: Array[DataType],
- hasKeyOrdering: Boolean,
numPartitions: Int): Serializer = {
// It is true when there is no field that needs to be write out.
// For now, we will not use SparkSqlSerializer2 when noField is true.
@@ -159,7 +148,7 @@ case class Exchange(
val serializer = if (useSqlSerializer2) {
logInfo("Using SparkSqlSerializer2.")
- new SparkSqlSerializer2(keySchema, valueSchema, hasKeyOrdering)
+ new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Using SparkSqlSerializer.")
new SparkSqlSerializer(sparkConf)
@@ -173,7 +162,7 @@ case class Exchange(
case HashPartitioning(expressions, numPartitions) =>
val keySchema = expressions.map(_.dataType).toArray
val valueSchema = child.output.map(_.dataType).toArray
- val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
+ val serializer = getSerializer(keySchema, valueSchema, numPartitions)
val part = new HashPartitioner(numPartitions)
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
@@ -189,15 +178,12 @@ case class Exchange(
}
}
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
- if (newOrdering.nonEmpty) {
- shuffled.setKeyOrdering(keyOrdering)
- }
shuffled.setSerializer(serializer)
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
val keySchema = child.output.map(_.dataType).toArray
- val serializer = getSerializer(keySchema, null, newOrdering.nonEmpty, numPartitions)
+ val serializer = getSerializer(keySchema, null, numPartitions)
val childRdd = child.execute()
val part: Partitioner = {
@@ -222,15 +208,12 @@ case class Exchange(
}
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
- if (newOrdering.nonEmpty) {
- shuffled.setKeyOrdering(keyOrdering)
- }
shuffled.setSerializer(serializer)
shuffled.map(_._1)
case SinglePartition =>
val valueSchema = child.output.map(_.dataType).toArray
- val serializer = getSerializer(null, valueSchema, hasKeyOrdering = false, 1)
+ val serializer = getSerializer(null, valueSchema, numPartitions = 1)
val partitioner = new HashPartitioner(1)
val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
@@ -306,29 +289,24 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
child: SparkPlan): SparkPlan = {
val needSort = rowOrdering.nonEmpty && child.outputOrdering != rowOrdering
val needsShuffle = child.outputPartitioning != partitioning
- val canSortWithShuffle = Exchange.canSortWithShuffle(partitioning, rowOrdering)
- if (needSort && needsShuffle && canSortWithShuffle) {
- Exchange(partitioning, rowOrdering, child)
+ val withShuffle = if (needsShuffle) {
+ Exchange(partitioning, Nil, child)
} else {
- val withShuffle = if (needsShuffle) {
- Exchange(partitioning, Nil, child)
- } else {
- child
- }
+ child
+ }
- val withSort = if (needSort) {
- if (sqlContext.conf.externalSortEnabled) {
- ExternalSort(rowOrdering, global = false, withShuffle)
- } else {
- Sort(rowOrdering, global = false, withShuffle)
- }
+ val withSort = if (needSort) {
+ if (sqlContext.conf.externalSortEnabled) {
+ ExternalSort(rowOrdering, global = false, withShuffle)
} else {
- withShuffle
+ Sort(rowOrdering, global = false, withShuffle)
}
-
- withSort
+ } else {
+ withShuffle
}
+
+ withSort
}
if (meetsRequirements && compatible && !needsAnySort) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 202e4488a6..15b6936acd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -86,7 +86,6 @@ private[sql] class Serializer2SerializationStream(
private[sql] class Serializer2DeserializationStream(
keySchema: Array[DataType],
valueSchema: Array[DataType],
- hasKeyOrdering: Boolean,
in: InputStream)
extends DeserializationStream with Logging {
@@ -96,14 +95,9 @@ private[sql] class Serializer2DeserializationStream(
if (schema == null) {
() => null
} else {
- if (hasKeyOrdering) {
- // We have key ordering specified in a ShuffledRDD, it is not safe to reuse a mutable row.
- () => new GenericMutableRow(schema.length)
- } else {
- // It is safe to reuse the mutable row.
- val mutableRow = new SpecificMutableRow(schema)
- () => mutableRow
- }
+ // It is safe to reuse the mutable row.
+ val mutableRow = new SpecificMutableRow(schema)
+ () => mutableRow
}
}
@@ -133,8 +127,7 @@ private[sql] class Serializer2DeserializationStream(
private[sql] class SparkSqlSerializer2Instance(
keySchema: Array[DataType],
- valueSchema: Array[DataType],
- hasKeyOrdering: Boolean)
+ valueSchema: Array[DataType])
extends SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer =
@@ -151,7 +144,7 @@ private[sql] class SparkSqlSerializer2Instance(
}
def deserializeStream(s: InputStream): DeserializationStream = {
- new Serializer2DeserializationStream(keySchema, valueSchema, hasKeyOrdering, s)
+ new Serializer2DeserializationStream(keySchema, valueSchema, s)
}
}
@@ -164,14 +157,13 @@ private[sql] class SparkSqlSerializer2Instance(
*/
private[sql] class SparkSqlSerializer2(
keySchema: Array[DataType],
- valueSchema: Array[DataType],
- hasKeyOrdering: Boolean)
+ valueSchema: Array[DataType])
extends Serializer
with Logging
with Serializable{
def newInstance(): SerializerInstance =
- new SparkSqlSerializer2Instance(keySchema, valueSchema, hasKeyOrdering)
+ new SparkSqlSerializer2Instance(keySchema, valueSchema)
override def supportsRelocationOfSerializedObjects: Boolean = {
// SparkSqlSerializer2 is stateless and writes no stream headers