aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-15 10:33:53 -0800
committerReynold Xin <rxin@databricks.com>2015-11-15 10:33:53 -0800
commitd22fc10887fdc6a86f6122648a823d0d37d4d795 (patch)
treeb50fefd8ba9965af31b254c7059d4d7fc9fb8497
parentd83c2f9f0b08d6d5d369d9fae04cdb15448e7f0d (diff)
downloadspark-d22fc10887fdc6a86f6122648a823d0d37d4d795.tar.gz
spark-d22fc10887fdc6a86f6122648a823d0d37d4d795.tar.bz2
spark-d22fc10887fdc6a86f6122648a823d0d37d4d795.zip
[SPARK-11734][SQL] Rename TungstenProject -> Project, TungstenSort -> Sort
I didn't remove the old Sort operator, since we still use it in randomized tests. I moved it into test module and renamed it ReferenceSort. Author: Reynold Xin <rxin@databricks.com> Closes #9700 from rxin/SPARK-11734.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala)55
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala61
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala69
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala86
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala2
15 files changed, 148 insertions, 184 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index a161cf0a31..62cbc518e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -51,7 +51,7 @@ case class Exchange(
}
val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
- s"${simpleNodeName}${extraInfo}"
+ s"$simpleNodeName$extraInfo"
}
/**
@@ -475,10 +475,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
if (requiredOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
- sqlContext.planner.BasicOperators.getSortOperator(
- requiredOrdering,
- global = false,
- child)
+ Sort(requiredOrdering, global = false, child = child)
} else {
child
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 52ef00ef5b..24207cb46f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -17,68 +17,22 @@
package org.apache.spark.sql.execution
+import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.CompletionIterator
-import org.apache.spark.util.collection.ExternalSorter
-import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
-
-////////////////////////////////////////////////////////////////////////////////////////////////////
-// This file defines various sort operators.
-////////////////////////////////////////////////////////////////////////////////////////////////////
/**
- * Performs a sort, spilling to disk as needed.
- * @param global when true performs a global sort of all partitions by shuffling the data first
- * if necessary.
- */
-case class Sort(
- sortOrder: Seq[SortOrder],
- global: Boolean,
- child: SparkPlan)
- extends UnaryNode {
-
- override def requiredChildDistribution: Seq[Distribution] =
- if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
-
- protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
- child.execute().mapPartitionsInternal( { iterator =>
- val ordering = newOrdering(sortOrder, child.output)
- val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
- TaskContext.get(), ordering = Some(ordering))
- sorter.insertAll(iterator.map(r => (r.copy(), null)))
- val baseIterator = sorter.iterator.map(_._1)
- val context = TaskContext.get()
- context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
- context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
- // TODO(marmbrus): The complex type signature below thwarts inference for no reason.
- CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
- }, preservesPartitioning = true)
- }
-
- override def output: Seq[Attribute] = child.output
-
- override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
-/**
- * Optimized version of [[Sort]] that operates on binary data (implemented as part of
- * Project Tungsten).
+ * Performs (external) sorting.
*
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
* @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
* spill every `frequency` records.
*/
-
-case class TungstenSort(
+case class Sort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
@@ -107,7 +61,7 @@ case class TungstenSort(
val dataSize = longMetric("dataSize")
val spillSize = longMetric("spillSize")
- child.execute().mapPartitions { iter =>
+ child.execute().mapPartitionsInternal { iter =>
val ordering = newOrdering(sortOrder, childOutput)
// The comparator for comparing prefix
@@ -143,5 +97,4 @@ case class TungstenSort(
sortedIterator
}
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index b7c5476346..6e9a4df828 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -80,7 +80,7 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
filterCondition.map(Filter(_, scan)).getOrElse(scan)
} else {
val scan = scanBuilder((projectSet ++ filterSet).toSeq)
- TungstenProject(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+ Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 67201a2c19..3d4ce633c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -302,16 +302,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object BasicOperators extends Strategy {
def numPartitions: Int = self.numPartitions
- /**
- * Picks an appropriate sort operator.
- *
- * @param global when true performs a global sort of all partitions by shuffling the data first
- * if necessary.
- */
- def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
- execution.TungstenSort(sortExprs, global, child)
- }
-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
@@ -339,11 +329,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
- getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
+ execution.Sort(sortExprs, global = false, child = planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) =>
- getSortOperator(sortExprs, global, planLater(child)):: Nil
+ execution.Sort(sortExprs, global, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
- execution.TungstenProject(projectList, planLater(child)) :: Nil
+ execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
execution.Filter(condition, planLater(child)) :: Nil
case e @ logical.Expand(_, _, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 07925c62cd..e79092efda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -30,7 +30,7 @@ import org.apache.spark.util.random.PoissonSampler
import org.apache.spark.{HashPartitioner, SparkEnv}
-case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
+case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
override private[sql] lazy val metrics = Map(
"numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 824c89a90e..9bbbfa7c77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -343,7 +343,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation)
- execution.TungstenProject(
+ execution.Project(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 8674da7a79..3eae3f6d85 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.scalatest.Matchers._
-import org.apache.spark.sql.execution.TungstenProject
+import org.apache.spark.sql.execution.Project
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@@ -619,7 +619,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
val projects = df.queryExecution.executedPlan.collect {
- case tungstenProject: TungstenProject => tungstenProject
+ case tungstenProject: Project => tungstenProject
}
assert(projects.size === expectedNumProjects)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 8c41d79dae..be53ec3e27 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -365,7 +365,7 @@ class PlannerSuite extends SharedSQLContext {
)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+ if (outputPlan.collect { case s: Sort => true }.isEmpty) {
fail(s"Sort should have been added:\n$outputPlan")
}
}
@@ -381,7 +381,7 @@ class PlannerSuite extends SharedSQLContext {
)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.nonEmpty) {
+ if (outputPlan.collect { case s: Sort => true }.nonEmpty) {
fail(s"No sorts should have been added:\n$outputPlan")
}
}
@@ -398,7 +398,7 @@ class PlannerSuite extends SharedSQLContext {
)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
- if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+ if (outputPlan.collect { case s: Sort => true }.isEmpty) {
fail(s"Sort should have been added:\n$outputPlan")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
new file mode 100644
index 0000000000..9575d26fd1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{InternalAccumulator, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.ExternalSorter
+
+
+/**
+ * A reference sort implementation used to compare against our normal sort.
+ */
+case class ReferenceSort(
+ sortOrder: Seq[SortOrder],
+ global: Boolean,
+ child: SparkPlan)
+ extends UnaryNode {
+
+ override def requiredChildDistribution: Seq[Distribution] =
+ if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
+
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
+ child.execute().mapPartitions( { iterator =>
+ val ordering = newOrdering(sortOrder, child.output)
+ val sorter = new ExternalSorter[InternalRow, Null, InternalRow](
+ TaskContext.get(), ordering = Some(ordering))
+ sorter.insertAll(iterator.map(r => (r.copy(), null)))
+ val baseIterator = sorter.iterator.map(_._1)
+ val context = TaskContext.get()
+ context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
+ context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+ CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
+ }, preservesPartitioning = true)
+ }
+
+ override def output: Seq[Attribute] = child.output
+
+ override def outputOrdering: Seq[SortOrder] = sortOrder
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index b3fceeab64..6876ab0f02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -33,9 +33,9 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
case c: ConvertToSafe => c
}
- private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+ private val outputsSafe = ReferenceSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(!outputsSafe.outputsUnsafeRows)
- private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
+ private val outputsUnsafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
assert(outputsUnsafe.outputsUnsafeRows)
test("planner should insert unsafe->safe conversions when required") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 847c188a30..e5d34be4c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -17,15 +17,22 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.Row
+import scala.util.Random
+
+import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{RandomDataGenerator, Row}
+
+/**
+ * Test sorting. Many of the test cases generate random data and compares the sorted result with one
+ * sorted by a reference implementation ([[ReferenceSort]]).
+ */
class SortSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.localSeqToDataFrameHolder
- // This test was originally added as an example of how to use [[SparkPlanTest]];
- // it's not designed to be a comprehensive test of ExternalSort.
test("basic sorting using ExternalSort") {
val input = Seq(
@@ -36,14 +43,66 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
checkAnswer(
input.toDF("a", "b", "c"),
- Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+ (child: SparkPlan) => Sort('a.asc :: 'b.asc :: Nil, global = true, child = child),
input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
sortAnswers = false)
checkAnswer(
input.toDF("a", "b", "c"),
- Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+ (child: SparkPlan) => Sort('b.asc :: 'a.asc :: Nil, global = true, child = child),
input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
sortAnswers = false)
}
+
+ test("sort followed by limit") {
+ checkThatPlansAgree(
+ (1 to 100).map(v => Tuple1(v)).toDF("a"),
+ (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child = child)),
+ (child: SparkPlan) => Limit(10, ReferenceSort('a.asc :: Nil, global = true, child)),
+ sortAnswers = false
+ )
+ }
+
+ test("sorting does not crash for large inputs") {
+ val sortOrder = 'a.asc :: Nil
+ val stringLength = 1024 * 1024 * 2
+ checkThatPlansAgree(
+ Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
+ Sort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+ ReferenceSort(sortOrder, global = true, _: SparkPlan),
+ sortAnswers = false
+ )
+ }
+
+ test("sorting updates peak execution memory") {
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") {
+ checkThatPlansAgree(
+ (1 to 100).map(v => Tuple1(v)).toDF("a"),
+ (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child = child),
+ (child: SparkPlan) => ReferenceSort('a.asc :: Nil, global = true, child),
+ sortAnswers = false)
+ }
+ }
+
+ // Test sorting on different data types
+ for (
+ dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
+ nullable <- Seq(true, false);
+ sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
+ randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
+ ) {
+ test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
+ val inputData = Seq.fill(1000)(randomDataGenerator())
+ val inputDf = sqlContext.createDataFrame(
+ sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
+ StructType(StructField("a", dataType, nullable = true) :: Nil)
+ )
+ checkThatPlansAgree(
+ inputDf,
+ p => ConvertToSafe(Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23)),
+ ReferenceSort(sortOrder, global = true, _: SparkPlan),
+ sortAnswers = false
+ )
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
deleted file mode 100644
index 7c860d1d58..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import scala.util.Random
-
-import org.apache.spark.AccumulatorSuite
-import org.apache.spark.sql.{RandomDataGenerator, Row, SQLConf}
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
-
-/**
- * A test suite that generates randomized data to test the [[TungstenSort]] operator.
- */
-class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
- import testImplicits.localSeqToDataFrameHolder
-
- test("sort followed by limit") {
- checkThatPlansAgree(
- (1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)),
- (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)),
- sortAnswers = false
- )
- }
-
- test("sorting does not crash for large inputs") {
- val sortOrder = 'a.asc :: Nil
- val stringLength = 1024 * 1024 * 2
- checkThatPlansAgree(
- Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1),
- TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
- Sort(sortOrder, global = true, _: SparkPlan),
- sortAnswers = false
- )
- }
-
- test("sorting updates peak execution memory") {
- AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") {
- checkThatPlansAgree(
- (1 to 100).map(v => Tuple1(v)).toDF("a"),
- (child: SparkPlan) => TungstenSort('a.asc :: Nil, true, child),
- (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child),
- sortAnswers = false)
- }
- }
-
- // Test sorting on different data types
- for (
- dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
- nullable <- Seq(true, false);
- sortOrder <- Seq('a.asc :: Nil, 'a.desc :: Nil);
- randomDataGenerator <- RandomDataGenerator.forType(dataType, nullable)
- ) {
- test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") {
- val inputData = Seq.fill(1000)(randomDataGenerator())
- val inputDf = sqlContext.createDataFrame(
- sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))),
- StructType(StructField("a", dataType, nullable = true) :: Nil)
- )
- checkThatPlansAgree(
- inputDf,
- plan => ConvertToSafe(
- TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)),
- Sort(sortOrder, global = true, _: SparkPlan),
- sortAnswers = false
- )
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 486bfbbd70..5e2b4154dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -114,17 +114,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
// PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
val df = person.select('name)
testSparkPlanMetrics(df, 1, Map(
- 0L ->("TungstenProject", Map(
- "number of rows" -> 2L)))
- )
- }
-
- test("TungstenProject metrics") {
- // Assume the execution plan is
- // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = person.select('name)
- testSparkPlanMetrics(df, 1, Map(
- 0L ->("TungstenProject", Map(
+ 0L ->("Project", Map(
"number of rows" -> 2L)))
)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
index 4cf4e13890..5bd323ea09 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.catalyst.expressions.{Cast, EqualTo}
-import org.apache.spark.sql.execution.TungstenProject
+import org.apache.spark.sql.execution.Project
import org.apache.spark.sql.hive.test.TestHive
/**
@@ -44,7 +44,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {
test("[SPARK-2210] boolean cast on boolean value should be removed") {
val q = "select cast(cast(key=0 as boolean) as boolean) from src"
val project = TestHive.sql(q).queryExecution.executedPlan.collect {
- case e: TungstenProject => e
+ case e: Project => e
}.head
// No cast expression introduced
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index b6db622580..e866493ee6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -151,7 +151,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
val physicalPlan = df.queryExecution.executedPlan
- assert(physicalPlan.collect { case p: execution.TungstenProject => p }.length === 1)
+ assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
}
}