aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-16 21:55:57 -0800
committerReynold Xin <rxin@databricks.com>2014-11-16 21:55:57 -0800
commit64c6b9bad559c21f25cd9fbe37c8813cdab939f2 (patch)
tree9ce4ae053c2e418abd19457014db6de6e4224cca /sql
parent5168c6ca9f0008027d688661bae57c28cf386b54 (diff)
downloadspark-64c6b9bad559c21f25cd9fbe37c8813cdab939f2.tar.gz
spark-64c6b9bad559c21f25cd9fbe37c8813cdab939f2.tar.bz2
spark-64c6b9bad559c21f25cd9fbe37c8813cdab939f2.zip
[SPARK-4410][SQL] Add support for external sort
Adds a new operator that uses Spark's `ExternalSort` class. It is off by default now, but we might consider making it the default if benchmarks show that it does not regress performance. Author: Michael Armbrust <michael@databricks.com> Closes #3268 from marmbrus/externalSort and squashes the following commits: 48b9726 [Michael Armbrust] comments b98799d [Michael Armbrust] Add test afd7562 [Michael Armbrust] Add support for external sort.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala16
4 files changed, 59 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index cd7d78e684..9697beb132 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -39,6 +39,10 @@ private[spark] object SQLConf {
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
+ // Options that control which operators can be chosen by the query planner. These should be
+ // considered hints and may be ignored by future versions of Spark SQL.
+ val EXTERNAL_SORT = "spark.sql.planner.externalSort"
+
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -96,6 +100,9 @@ private[sql] trait SQLConf {
private[spark] def parquetFilterPushDown =
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+ /** When true the planner will use the external sort, which may spill to disk. */
+ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
+
/**
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 03cd5bd627..7ef1f9f2c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
+
+ case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
+ execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
case logical.Sort(sortExprs, child) =>
- // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
+
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 1b8ba3ace2..e53723c176 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
+import org.apache.spark.util.collection.ExternalSorter
/**
* :: DeveloperApi ::
@@ -189,6 +190,9 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
/**
* :: DeveloperApi ::
+ * Performs a sort on-heap.
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ * if necessary.
*/
@DeveloperApi
case class Sort(
@@ -199,12 +203,37 @@ case class Sort(
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+ override def execute() = attachTree(this, "sort") {
+ child.execute().mapPartitions( { iterator =>
+ val ordering = newOrdering(sortOrder, child.output)
+ iterator.map(_.copy()).toArray.sorted(ordering).iterator
+ }, preservesPartitioning = true)
+ }
+
+ override def output = child.output
+}
+
+/**
+ * :: DeveloperApi ::
+ * Performs a sort, spilling to disk as needed.
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ * if necessary.
+ */
+@DeveloperApi
+case class ExternalSort(
+ sortOrder: Seq[SortOrder],
+ global: Boolean,
+ child: SparkPlan)
+ extends UnaryNode {
+ override def requiredChildDistribution =
+ if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
override def execute() = attachTree(this, "sort") {
- child.execute()
- .mapPartitions( { iterator =>
- val ordering = newOrdering(sortOrder, child.output)
- iterator.map(_.copy()).toArray.sorted(ordering).iterator
+ child.execute().mapPartitions( { iterator =>
+ val ordering = newOrdering(sortOrder, child.output)
+ val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
+ sorter.insertAll(iterator.map(r => (r, null)))
+ sorter.iterator.map(_._1)
}, preservesPartitioning = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ce5672c086..a63515464c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -196,7 +196,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
Seq(Seq("1")))
}
- test("sorting") {
+ def sortTest() = {
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC"),
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
@@ -238,6 +238,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
mapData.collect().sortBy(_.data(1)).reverse.toSeq)
}
+ test("sorting") {
+ val before = externalSortEnabled
+ setConf(SQLConf.EXTERNAL_SORT, "false")
+ sortTest()
+ setConf(SQLConf.EXTERNAL_SORT, before.toString)
+ }
+
+ test("external sorting") {
+ val before = externalSortEnabled
+ setConf(SQLConf.EXTERNAL_SORT, "true")
+ sortTest()
+ setConf(SQLConf.EXTERNAL_SORT, before.toString)
+ }
+
test("limit") {
checkAnswer(
sql("SELECT * FROM testData LIMIT 10"),