aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-19 21:40:21 -0700
committerReynold Xin <rxin@databricks.com>2015-09-19 21:40:21 -0700
commit2117eea71ece825fbc3797c8b38184ae221f5223 (patch)
tree06481ef1968367118e89779335e24245f57f2017 /sql
parente789000b88a6bd840f821c53f42c08b97dc02496 (diff)
downloadspark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.gz
spark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.bz2
spark-2117eea71ece825fbc3797c8b38184ae221f5223.zip
[SPARK-10710] Remove ability to disable spilling in core and SQL
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`. This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling. Author: Josh Rosen <joshrosen@databricks.com> Closes #8831 from JoshRosen/remove-ability-to-disable-spilling.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala4
7 files changed, 19 insertions, 62 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9de75f4c4d..b9fb90d964 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -330,11 +330,6 @@ private[spark] object SQLConf {
// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
- val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
- defaultValue = Some(true),
- doc = "When true, performs sorts spilling to disk as needed otherwise sort each partition in" +
- " memory.")
-
val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
defaultValue = Some(true),
doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.")
@@ -422,6 +417,7 @@ private[spark] object SQLConf {
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+ val EXTERNAL_SORT = "spark.sql.planner.externalSort"
}
}
@@ -476,8 +472,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
- private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
-
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5e40d77689..41b215c792 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -312,8 +312,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
- } else if (sqlContext.conf.externalSortEnabled) {
- execution.ExternalSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 95209e6634..af28e2dfa4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -105,6 +105,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
+ case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " +
+ s"External sort will continue to be used.")
+ Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true"))
+ }
+ (keyValueOutput, runFunc)
+
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 40ef7c3b53..27f26245a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -31,38 +31,12 @@ import org.apache.spark.{SparkEnv, InternalAccumulator, TaskContext}
// This file defines various sort operators.
////////////////////////////////////////////////////////////////////////////////////////////////////
-
-/**
- * Performs a sort on-heap.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- * if necessary.
- */
-case class Sort(
- sortOrder: Seq[SortOrder],
- global: Boolean,
- child: SparkPlan)
- extends UnaryNode {
- override def requiredChildDistribution: Seq[Distribution] =
- if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
- protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
- child.execute().mapPartitions( { iterator =>
- val ordering = newOrdering(sortOrder, child.output)
- iterator.map(_.copy()).toArray.sorted(ordering).iterator
- }, preservesPartitioning = true)
- }
-
- override def output: Seq[Attribute] = child.output
-
- override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
/**
* Performs a sort, spilling to disk as needed.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
-case class ExternalSort(
+case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
@@ -93,7 +67,7 @@ case class ExternalSort(
}
/**
- * Optimized version of [[ExternalSort]] that operates on binary data (implemented as part of
+ * Optimized version of [[Sort]] that operates on binary data (implemented as part of
* Project Tungsten).
*
* @param global when true performs a global sort of all partitions by shuffling the data first
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f9981356f3..05b4127cbc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -581,28 +581,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
mapData.collect().sortBy(_.data(1)).reverse.map(Row.fromTuple).toSeq)
}
- test("sorting") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false") {
- sortTest()
- }
- }
-
test("external sorting") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true") {
- sortTest()
- }
- }
-
- test("SPARK-6927 sorting with codegen on") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false",
- SQLConf.CODEGEN_ENABLED.key -> "true") {
- sortTest()
- }
+ sortTest()
}
test("SPARK-6927 external sorting with codegen on") {
- withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true",
- SQLConf.CODEGEN_ENABLED.key -> "true") {
+ withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
sortTest()
}
}
@@ -1731,10 +1715,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("external sorting updates peak execution memory") {
- withSQLConf((SQLConf.EXTERNAL_SORT.key, "true")) {
- AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
- sortTest()
- }
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
+ sortTest()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 4492e37ad0..5dc37e5c3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -32,7 +32,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
case c: ConvertToSafe => c
}
- private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+ private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(!outputsSafe.outputsUnsafeRows)
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 3073d492e6..847c188a30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -36,13 +36,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
checkAnswer(
input.toDF("a", "b", "c"),
- ExternalSort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+ Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
sortAnswers = false)
checkAnswer(
input.toDF("a", "b", "c"),
- ExternalSort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+ Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
sortAnswers = false)
}