aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-11 12:47:02 -0800
committerReynold Xin <rxin@databricks.com>2015-11-11 12:47:02 -0800
commitdf97df2b39194f60051f78cce23f0ba6cfe4b1df (patch)
tree372c82c02e3bd4e4a5895adfb954534ed42a5ae7
parent27029bc8f6246514bd0947500c94cf38dc8616c3 (diff)
downloadspark-df97df2b39194f60051f78cce23f0ba6cfe4b1df.tar.gz
spark-df97df2b39194f60051f78cce23f0ba6cfe4b1df.tar.bz2
spark-df97df2b39194f60051f78cce23f0ba6cfe4b1df.zip
[SPARK-11644][SQL] Remove the option to turn off unsafe and codegen.
Author: Reynold Xin <rxin@databricks.com> Closes #9618 from rxin/SPARK-11644.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala120
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala80
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala31
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala68
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala123
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala44
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala72
27 files changed, 257 insertions, 494 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 57d7d30e0e..e02b502b7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -252,24 +252,8 @@ private[spark] object SQLConf {
"not be provided to ExchangeCoordinator.",
isPublic = false)
- val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
- defaultValue = Some(true),
- doc = "When true, use the optimized Tungsten physical execution backend which explicitly " +
- "manages memory and dynamically generates bytecode for expression evaluation.")
-
- val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
- defaultValue = Some(true), // use TUNGSTEN_ENABLED as default
- doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
- " a specific query.",
- isPublic = false)
-
- val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
- defaultValue = Some(true), // use TUNGSTEN_ENABLED as default
- doc = "When true, use the new optimized Tungsten physical execution backend.",
- isPublic = false)
-
val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
- defaultValue = Some(true), // use CODEGEN_ENABLED as default
+ defaultValue = Some(true),
doc = "When true, common subexpressions will be eliminated.",
isPublic = false)
@@ -475,6 +459,9 @@ private[spark] object SQLConf {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2"
+ val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
+ val CODEGEN_ENABLED = "spark.sql.codegen"
+ val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
}
}
@@ -541,14 +528,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
- private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))
-
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
- private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED))
-
private[spark] def subexpressionEliminationEnabled: Boolean =
- getConf(SUBEXPRESSION_ELIMINATION_ENABLED, codegenEnabled)
+ getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index b733b26987..d0e4e06809 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -58,7 +58,7 @@ case class Exchange(
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
- unsafeEnabled && codegenEnabled && GenerateUnsafeProjection.canSupport(child.schema) &&
+ GenerateUnsafeProjection.canSupport(child.schema) &&
!newPartitioning.isInstanceOf[RangePartitioning]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 77843f53b9..5da5aea17e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -77,7 +77,6 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
- |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 8650ac500b..1b833002f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -54,18 +54,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def sparkContext = sqlContext.sparkContext
// sqlContext will be null when we are being deserialized on the slaves. In this instance
- // the value of codegenEnabled/unsafeEnabled will be set by the desserializer after the
+ // the value of subexpressionEliminationEnabled will be set by the desserializer after the
// constructor has run.
- val codegenEnabled: Boolean = if (sqlContext != null) {
- sqlContext.conf.codegenEnabled
- } else {
- false
- }
- val unsafeEnabled: Boolean = if (sqlContext != null) {
- sqlContext.conf.unsafeEnabled
- } else {
- false
- }
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.subexpressionEliminationEnabled
} else {
@@ -233,83 +223,63 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def newProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
- log.debug(
- s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
- try {
- GenerateProjection.generate(expressions, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate projection, fallback to interpret", e)
- new InterpretedProjection(expressions, inputSchema)
- }
- }
- } else {
- new InterpretedProjection(expressions, inputSchema)
+ log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate projection, fallback to interpret", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
}
}
protected def newMutableProjection(
- expressions: Seq[Expression],
- inputSchema: Seq[Attribute]): () => MutableProjection = {
- log.debug(
- s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if(codegenEnabled) {
- try {
- GenerateMutableProjection.generate(expressions, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- () => new InterpretedMutableProjection(expressions, inputSchema)
- }
- }
- } else {
- () => new InterpretedMutableProjection(expressions, inputSchema)
+ expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
+ log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
+ try {
+ GenerateMutableProjection.generate(expressions, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate mutable projection, fallback to interpreted", e)
+ () => new InterpretedMutableProjection(expressions, inputSchema)
+ }
}
}
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- if (codegenEnabled) {
- try {
- GeneratePredicate.generate(expression, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate predicate, fallback to interpreted", e)
- InterpretedPredicate.create(expression, inputSchema)
- }
- }
- } else {
- InterpretedPredicate.create(expression, inputSchema)
+ try {
+ GeneratePredicate.generate(expression, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate predicate, fallback to interpreted", e)
+ InterpretedPredicate.create(expression, inputSchema)
+ }
}
}
protected def newOrdering(
- order: Seq[SortOrder],
- inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
- if (codegenEnabled) {
- try {
- GenerateOrdering.generate(order, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate ordering, fallback to interpreted", e)
- new InterpretedOrdering(order, inputSchema)
- }
- }
- } else {
- new InterpretedOrdering(order, inputSchema)
+ order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
+ try {
+ GenerateOrdering.generate(order, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate ordering, fallback to interpreted", e)
+ new InterpretedOrdering(order, inputSchema)
+ }
}
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index a10d1edcc9..cf482ae4a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -27,10 +27,6 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext
- def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled
-
- def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled
-
def numPartitions: Int = sqlContext.conf.numShufflePartitions
def strategies: Seq[Strategy] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d65cb1bae7..96242f160a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -327,8 +327,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* if necessary.
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
- if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
- TungstenSort.supportsSchema(child.schema)) {
+ if (TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
@@ -368,8 +367,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Project(projectList, child) =>
// If unsafe mode is enabled and we support these data types in Unsafe, use the
// Tungsten project. Otherwise, use the normal project.
- if (sqlContext.conf.unsafeEnabled &&
- UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
+ if (UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
execution.TungstenProject(projectList, planLater(child)) :: Nil
} else {
execution.Project(projectList, planLater(child)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
index 79abf2d592..a70e41436c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
@@ -59,13 +59,10 @@ object Utils {
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
// Check if we can use TungstenAggregate.
- val usesTungstenAggregate =
- child.sqlContext.conf.unsafeEnabled &&
- TungstenAggregate.supportsAggregate(
+ val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-
// 1. Create an Aggregate Operator for partial aggregations.
val groupingAttributes = groupingExpressions.map(_.toAttribute)
@@ -144,11 +141,9 @@ object Utils {
child: SparkPlan): Seq[SparkPlan] = {
val aggregateExpressions = functionsWithDistinct ++ functionsWithoutDistinct
- val usesTungstenAggregate =
- child.sqlContext.conf.unsafeEnabled &&
- TungstenAggregate.supportsAggregate(
- groupingExpressions,
- aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
+ val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
+ groupingExpressions,
+ aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
// functionsWithDistinct is guaranteed to be non-empty. Even though it may contain more than one
// DISTINCT aggregate function, all of those functions will have the same column expression.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 8b2755a587..e29c281b95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -121,6 +121,33 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
+ case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
+ s"will be ignored. Tungsten will continue to be used.")
+ Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
+ }
+ (keyValueOutput, runFunc)
+
+ case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
+ s"will be ignored. Codegen will continue to be used.")
+ Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true"))
+ }
+ (keyValueOutput, runFunc)
+
+ case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
+ s"will be ignored. Unsafe mode will continue to be used.")
+ Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true"))
+ }
+ (keyValueOutput, runFunc)
+
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 7ce4a51783..997f7f494f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -45,9 +45,7 @@ trait HashJoin {
override def output: Seq[Attribute] = left.output ++ right.output
protected[this] def isUnsafeMode: Boolean = {
- (self.codegenEnabled && self.unsafeEnabled
- && UnsafeProjection.canSupport(buildKeys)
- && UnsafeProjection.canSupport(self.schema))
+ UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)
}
override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index 15b06b1537..3633f356b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -65,9 +65,9 @@ trait HashOuterJoin {
}
protected[this] def isUnsafeMode: Boolean = {
- (self.codegenEnabled && self.unsafeEnabled && joinType != FullOuter
- && UnsafeProjection.canSupport(buildKeys)
- && UnsafeProjection.canSupport(self.schema))
+ joinType != FullOuter &&
+ UnsafeProjection.canSupport(buildKeys) &&
+ UnsafeProjection.canSupport(self.schema)
}
override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index beb141ade6..c7d13e0a72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -34,11 +34,10 @@ trait HashSemiJoin {
override def output: Seq[Attribute] = left.output
protected[this] def supportUnsafe: Boolean = {
- (self.codegenEnabled && self.unsafeEnabled
- && UnsafeProjection.canSupport(leftKeys)
- && UnsafeProjection.canSupport(rightKeys)
- && UnsafeProjection.canSupport(left.schema)
- && UnsafeProjection.canSupport(right.schema))
+ UnsafeProjection.canSupport(leftKeys) &&
+ UnsafeProjection.canSupport(rightKeys) &&
+ UnsafeProjection.canSupport(left.schema) &&
+ UnsafeProjection.canSupport(right.schema)
}
override def outputsUnsafeRows: Boolean = supportUnsafe
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 17030947b7..7aee8e3dd3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -54,10 +54,9 @@ case class SortMergeJoin(
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil
protected[this] def isUnsafeMode: Boolean = {
- (codegenEnabled && unsafeEnabled
- && UnsafeProjection.canSupport(leftKeys)
- && UnsafeProjection.canSupport(rightKeys)
- && UnsafeProjection.canSupport(schema))
+ UnsafeProjection.canSupport(leftKeys) &&
+ UnsafeProjection.canSupport(rightKeys) &&
+ UnsafeProjection.canSupport(schema)
}
override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index 7e854e6702..5f1590c463 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -90,10 +90,9 @@ case class SortMergeOuterJoin(
}
private def isUnsafeMode: Boolean = {
- (codegenEnabled && unsafeEnabled
- && UnsafeProjection.canSupport(leftKeys)
- && UnsafeProjection.canSupport(rightKeys)
- && UnsafeProjection.canSupport(schema))
+ UnsafeProjection.canSupport(leftKeys) &&
+ UnsafeProjection.canSupport(rightKeys) &&
+ UnsafeProjection.canSupport(schema)
}
override def outputsUnsafeRows: Boolean = isUnsafeMode
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
index b1dc719ca8..aef655727f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
@@ -46,10 +46,7 @@ trait HashJoinNode {
private[this] var joinKeys: Projection = _
protected def isUnsafeMode: Boolean = {
- (codegenEnabled &&
- unsafeEnabled &&
- UnsafeProjection.canSupport(schema) &&
- UnsafeProjection.canSupport(streamedKeys))
+ UnsafeProjection.canSupport(schema) && UnsafeProjection.canSupport(streamedKeys)
}
private def streamSideKeyGenerator: Projection = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index f96b62a67a..d3381eac91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -35,10 +35,6 @@ import org.apache.spark.sql.types.StructType
*/
abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Logging {
- protected val codegenEnabled: Boolean = conf.codegenEnabled
-
- protected val unsafeEnabled: Boolean = conf.unsafeEnabled
-
private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")
/**
@@ -111,21 +107,17 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): Projection = {
log.debug(
- s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
- try {
- GenerateProjection.generate(expressions, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate projection, fallback to interpret", e)
- new InterpretedProjection(expressions, inputSchema)
- }
- }
- } else {
- new InterpretedProjection(expressions, inputSchema)
+ s"Creating Projection: $expressions, inputSchema: $inputSchema")
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate projection, fallback to interpret", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
}
}
@@ -133,41 +125,33 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
- s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
- try {
- GenerateMutableProjection.generate(expressions, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- () => new InterpretedMutableProjection(expressions, inputSchema)
- }
- }
- } else {
- () => new InterpretedMutableProjection(expressions, inputSchema)
+ s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
+ try {
+ GenerateMutableProjection.generate(expressions, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate mutable projection, fallback to interpreted", e)
+ () => new InterpretedMutableProjection(expressions, inputSchema)
+ }
}
}
protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- if (codegenEnabled) {
- try {
- GeneratePredicate.generate(expression, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate predicate, fallback to interpreted", e)
- InterpretedPredicate.create(expression, inputSchema)
- }
- }
- } else {
- InterpretedPredicate.create(expression, inputSchema)
+ try {
+ GeneratePredicate.generate(expression, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate predicate, fallback to interpreted", e)
+ InterpretedPredicate.create(expression, inputSchema)
+ }
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 5b8841bc15..48de693a99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -423,8 +423,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
- private val codegenEnabled = sqlContext.conf.codegenEnabled
-
private var _partitionSpec: PartitionSpec = _
private class FileStatusCache {
@@ -661,7 +659,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
- val codegenEnabled = this.codegenEnabled
val needConversion = this.needConversion
val requiredOutput = requiredColumns.map { col =>
@@ -678,11 +675,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
converted.mapPartitions { rows =>
- val buildProjection = if (codegenEnabled) {
+ val buildProjection =
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
- } else {
- () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes)
- }
val projectedRows = {
val mutableProjection = buildProjection()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f3a7aa2803..e4f23fe17b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -621,11 +621,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6899: type should match when using codegen") {
- withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
- checkAnswer(
- decimalData.agg(avg('a)),
- Row(new java.math.BigDecimal(2.0)))
- }
+ checkAnswer(decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0)))
}
test("SPARK-7133: Implement struct, array, and map field accessor") {
@@ -844,31 +840,16 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-8608: call `show` on local DataFrame with random columns should return same value") {
- // Make sure we can pass this test for both codegen mode and interpreted mode.
- withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
- val df = testData.select(rand(33))
- assert(df.showString(5) == df.showString(5))
- }
-
- withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") {
- val df = testData.select(rand(33))
- assert(df.showString(5) == df.showString(5))
- }
+ val df = testData.select(rand(33))
+ assert(df.showString(5) == df.showString(5))
// We will reuse the same Expression object for LocalRelation.
- val df = (1 to 10).map(Tuple1.apply).toDF().select(rand(33))
- assert(df.showString(5) == df.showString(5))
+ val df1 = (1 to 10).map(Tuple1.apply).toDF().select(rand(33))
+ assert(df1.showString(5) == df1.showString(5))
}
test("SPARK-8609: local DataFrame with random columns should return same value after sort") {
- // Make sure we can pass this test for both codegen mode and interpreted mode.
- withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
- checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
- }
-
- withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") {
- checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
- }
+ checkAnswer(testData.sort(rand(33)), testData.sort(rand(33)))
// We will reuse the same Expression object for LocalRelation.
val df = (1 to 10).map(Tuple1.apply).toDF()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
index 7ae12a7895..68e99d6a6b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
@@ -31,52 +31,46 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
import testImplicits._
test("test simple types") {
- withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
- val df = sparkContext.parallelize(Seq((1, 2))).toDF("a", "b")
- assert(df.select(struct("a", "b")).first().getStruct(0) === Row(1, 2))
- }
+ val df = sparkContext.parallelize(Seq((1, 2))).toDF("a", "b")
+ assert(df.select(struct("a", "b")).first().getStruct(0) === Row(1, 2))
}
test("test struct type") {
- withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
- val struct = Row(1, 2L, 3.0F, 3.0)
- val data = sparkContext.parallelize(Seq(Row(1, struct)))
+ val struct = Row(1, 2L, 3.0F, 3.0)
+ val data = sparkContext.parallelize(Seq(Row(1, struct)))
- val schema = new StructType()
- .add("a", IntegerType)
- .add("b",
- new StructType()
- .add("b1", IntegerType)
- .add("b2", LongType)
- .add("b3", FloatType)
- .add("b4", DoubleType))
+ val schema = new StructType()
+ .add("a", IntegerType)
+ .add("b",
+ new StructType()
+ .add("b1", IntegerType)
+ .add("b2", LongType)
+ .add("b3", FloatType)
+ .add("b4", DoubleType))
- val df = sqlContext.createDataFrame(data, schema)
- assert(df.select("b").first() === Row(struct))
- }
+ val df = sqlContext.createDataFrame(data, schema)
+ assert(df.select("b").first() === Row(struct))
}
test("test nested struct type") {
- withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
- val innerStruct = Row(1, "abcd")
- val outerStruct = Row(1, 2L, 3.0F, 3.0, innerStruct, "efg")
- val data = sparkContext.parallelize(Seq(Row(1, outerStruct)))
+ val innerStruct = Row(1, "abcd")
+ val outerStruct = Row(1, 2L, 3.0F, 3.0, innerStruct, "efg")
+ val data = sparkContext.parallelize(Seq(Row(1, outerStruct)))
- val schema = new StructType()
- .add("a", IntegerType)
- .add("b",
- new StructType()
- .add("b1", IntegerType)
- .add("b2", LongType)
- .add("b3", FloatType)
- .add("b4", DoubleType)
- .add("b5", new StructType()
- .add("b5a", IntegerType)
- .add("b5b", StringType))
- .add("b6", StringType))
+ val schema = new StructType()
+ .add("a", IntegerType)
+ .add("b",
+ new StructType()
+ .add("b1", IntegerType)
+ .add("b2", LongType)
+ .add("b3", FloatType)
+ .add("b4", DoubleType)
+ .add("b5", new StructType()
+ .add("b5a", IntegerType)
+ .add("b5b", StringType))
+ .add("b6", StringType))
- val df = sqlContext.createDataFrame(data, schema)
- assert(df.select("b").first() === Row(outerStruct))
- }
+ val df = sqlContext.createDataFrame(data, schema)
+ assert(df.select("b").first() === Row(outerStruct))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 19e850a46f..acabe32c67 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -261,8 +261,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("aggregation with codegen") {
- val originalValue = sqlContext.conf.codegenEnabled
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
// Prepare a table that we can group some rows.
sqlContext.table("testData")
.unionAll(sqlContext.table("testData"))
@@ -347,7 +345,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(null, null, null, 0) :: Nil)
} finally {
sqlContext.dropTempTable("testData3x")
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
}
}
@@ -567,12 +564,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
sortTest()
}
- test("SPARK-6927 external sorting with codegen on") {
- withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
- sortTest()
- }
- }
-
test("limit") {
checkAnswer(
sql("SELECT * FROM testData LIMIT 10"),
@@ -1624,12 +1615,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("aggregation with codegen updates peak execution memory") {
- withSQLConf((SQLConf.CODEGEN_ENABLED.key, "true")) {
- AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "aggregation with codegen") {
- testCodeGen(
- "SELECT key, count(value) FROM testData GROUP BY key",
- (1 to 100).map(i => Row(i, 1)))
- }
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "aggregation with codegen") {
+ testCodeGen(
+ "SELECT key, count(value) FROM testData GROUP BY key",
+ (1 to 100).map(i => Row(i, 1)))
}
}
@@ -1783,9 +1772,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
// This bug will be triggered when Tungsten is enabled and there are multiple
// SortMergeJoin operators executed in the same task.
val confs =
- SQLConf.SORTMERGE_JOIN.key -> "true" ::
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" ::
- SQLConf.TUNGSTEN_ENABLED.key -> "true" :: Nil
+ SQLConf.SORTMERGE_JOIN.key -> "true" :: SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil
withSQLConf(confs: _*) {
val df1 = (1 to 50).map(i => (s"str_$i", i)).toDF("i", "j")
val df2 =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
index 7a0f0dfd2b..85486c0889 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
@@ -31,19 +31,6 @@ import org.apache.spark.sql.types._
class TungstenSortSuite extends SparkPlanTest with SharedSQLContext {
import testImplicits.localSeqToDataFrameHolder
- override def beforeAll(): Unit = {
- super.beforeAll()
- sqlContext.conf.setConf(SQLConf.CODEGEN_ENABLED, true)
- }
-
- override def afterAll(): Unit = {
- try {
- sqlContext.conf.unsetConf(SQLConf.CODEGEN_ENABLED)
- } finally {
- super.afterAll()
- }
- }
-
test("sort followed by limit") {
checkThatPlansAgree(
(1 to 100).map(v => Tuple1(v)).toDF("a"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index dcbfdca71a..5b2998c3c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLConf, SQLContext, QueryTest}
/**
- * Test various broadcast join operators with unsafe enabled.
+ * Test various broadcast join operators.
*
* Tests in this suite we need to run Spark in local-cluster mode. In particular, the use of
* unsafe map in [[org.apache.spark.sql.execution.joins.UnsafeHashedRelation]] is not triggered
@@ -45,8 +45,6 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
.setAppName("testing")
val sc = new SparkContext(conf)
sqlContext = new SQLContext(sc)
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED, true)
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
}
override def afterAll(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
index 8c2e78b2a9..44b0d9d410 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
@@ -28,12 +28,9 @@ import org.apache.spark.sql.execution.joins.{HashedRelation, BuildLeft, BuildRig
class HashJoinNodeSuite extends LocalNodeTest {
// Test all combinations of the two dimensions: with/out unsafe and build sides
- private val maybeUnsafeAndCodegen = Seq(false, true)
private val buildSides = Seq(BuildLeft, BuildRight)
- maybeUnsafeAndCodegen.foreach { unsafeAndCodegen =>
- buildSides.foreach { buildSide =>
- testJoin(unsafeAndCodegen, buildSide)
- }
+ buildSides.foreach { buildSide =>
+ testJoin(buildSide)
}
/**
@@ -45,10 +42,7 @@ class HashJoinNodeSuite extends LocalNodeTest {
buildKeys: Seq[Expression],
buildNode: LocalNode): HashedRelation = {
- val isUnsafeMode =
- conf.codegenEnabled &&
- conf.unsafeEnabled &&
- UnsafeProjection.canSupport(buildKeys)
+ val isUnsafeMode = UnsafeProjection.canSupport(buildKeys)
val buildSideKeyGenerator =
if (isUnsafeMode) {
@@ -68,15 +62,10 @@ class HashJoinNodeSuite extends LocalNodeTest {
/**
* Test inner hash join with varying degrees of matches.
*/
- private def testJoin(
- unsafeAndCodegen: Boolean,
- buildSide: BuildSide): Unit = {
- val simpleOrUnsafe = if (!unsafeAndCodegen) "simple" else "unsafe"
- val testNamePrefix = s"$simpleOrUnsafe / $buildSide"
+ private def testJoin(buildSide: BuildSide): Unit = {
+ val testNamePrefix = buildSide
val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
val conf = new SQLConf
- conf.setConf(SQLConf.UNSAFE_ENABLED, unsafeAndCodegen)
- conf.setConf(SQLConf.CODEGEN_ENABLED, unsafeAndCodegen)
// Actual test body
def runTest(leftInput: Array[(Int, String)], rightInput: Array[(Int, String)]): Unit = {
@@ -119,7 +108,7 @@ class HashJoinNodeSuite extends LocalNodeTest {
.map { case (k, v) => (k, v, k, rightInputMap(k)) }
Seq(makeBinaryHashJoinNode, makeBroadcastJoinNode).foreach { makeNode =>
- val makeUnsafeNode = if (unsafeAndCodegen) wrapForUnsafe(makeNode) else makeNode
+ val makeUnsafeNode = wrapForUnsafe(makeNode)
val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
val actualOutput = hashJoinNode.collect().map { row =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
index 40299d9d5e..252f7cc897 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
@@ -26,30 +26,21 @@ import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
class NestedLoopJoinNodeSuite extends LocalNodeTest {
// Test all combinations of the three dimensions: with/out unsafe, build sides, and join types
- private val maybeUnsafeAndCodegen = Seq(false, true)
private val buildSides = Seq(BuildLeft, BuildRight)
private val joinTypes = Seq(LeftOuter, RightOuter, FullOuter)
- maybeUnsafeAndCodegen.foreach { unsafeAndCodegen =>
- buildSides.foreach { buildSide =>
- joinTypes.foreach { joinType =>
- testJoin(unsafeAndCodegen, buildSide, joinType)
- }
+ buildSides.foreach { buildSide =>
+ joinTypes.foreach { joinType =>
+ testJoin(buildSide, joinType)
}
}
/**
* Test outer nested loop joins with varying degrees of matches.
*/
- private def testJoin(
- unsafeAndCodegen: Boolean,
- buildSide: BuildSide,
- joinType: JoinType): Unit = {
- val simpleOrUnsafe = if (!unsafeAndCodegen) "simple" else "unsafe"
- val testNamePrefix = s"$simpleOrUnsafe / $buildSide / $joinType"
+ private def testJoin(buildSide: BuildSide, joinType: JoinType): Unit = {
+ val testNamePrefix = s"$buildSide / $joinType"
val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
val conf = new SQLConf
- conf.setConf(SQLConf.UNSAFE_ENABLED, unsafeAndCodegen)
- conf.setConf(SQLConf.CODEGEN_ENABLED, unsafeAndCodegen)
// Actual test body
def runTest(
@@ -63,7 +54,7 @@ class NestedLoopJoinNodeSuite extends LocalNodeTest {
resolveExpressions(
new NestedLoopJoinNode(conf, node1, node2, buildSide, joinType, Some(cond)))
}
- val makeUnsafeNode = if (unsafeAndCodegen) wrapForUnsafe(makeNode) else makeNode
+ val makeUnsafeNode = wrapForUnsafe(makeNode)
val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
val expectedOutput = generateExpectedOutput(leftInput, rightInput, joinType)
val actualOutput = hashJoinNode.collect().map { row =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 97162249d9..544c1ef303 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -110,33 +110,23 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
test("Project metrics") {
- withSQLConf(
- SQLConf.UNSAFE_ENABLED.key -> "false",
- SQLConf.CODEGEN_ENABLED.key -> "false",
- SQLConf.TUNGSTEN_ENABLED.key -> "false") {
- // Assume the execution plan is
- // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
- val df = person.select('name)
- testSparkPlanMetrics(df, 1, Map(
- 0L ->("Project", Map(
- "number of rows" -> 2L)))
- )
- }
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
+ val df = person.select('name)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L ->("TungstenProject", Map(
+ "number of rows" -> 2L)))
+ )
}
test("TungstenProject metrics") {
- withSQLConf(
- SQLConf.UNSAFE_ENABLED.key -> "true",
- SQLConf.CODEGEN_ENABLED.key -> "true",
- SQLConf.TUNGSTEN_ENABLED.key -> "true") {
- // Assume the execution plan is
- // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
- val df = person.select('name)
- testSparkPlanMetrics(df, 1, Map(
- 0L ->("TungstenProject", Map(
- "number of rows" -> 2L)))
- )
- }
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
+ val df = person.select('name)
+ testSparkPlanMetrics(df, 1, Map(
+ 0L ->("TungstenProject", Map(
+ "number of rows" -> 2L)))
+ )
}
test("Filter metrics") {
@@ -150,71 +140,30 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
)
}
- test("SortBasedAggregate metrics") {
- // Because SortBasedAggregate may skip different rows if the number of partitions is different,
- // this test should use the deterministic number of partitions.
- withSQLConf(
- SQLConf.UNSAFE_ENABLED.key -> "false",
- SQLConf.CODEGEN_ENABLED.key -> "true",
- SQLConf.TUNGSTEN_ENABLED.key -> "true") {
- // Assume the execution plan is
- // ... -> SortBasedAggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) ->
- // SortBasedAggregate(nodeId = 0)
- val df = testData2.groupBy().count() // 2 partitions
- testSparkPlanMetrics(df, 1, Map(
- 2L -> ("SortBasedAggregate", Map(
- "number of input rows" -> 6L,
- "number of output rows" -> 2L)),
- 0L -> ("SortBasedAggregate", Map(
- "number of input rows" -> 2L,
- "number of output rows" -> 1L)))
- )
-
- // Assume the execution plan is
- // ... -> SortBasedAggregate(nodeId = 3) -> TungstenExchange(nodeId = 2)
- // -> ExternalSort(nodeId = 1)-> SortBasedAggregate(nodeId = 0)
- // 2 partitions and each partition contains 2 keys
- val df2 = testData2.groupBy('a).count()
- testSparkPlanMetrics(df2, 1, Map(
- 3L -> ("SortBasedAggregate", Map(
- "number of input rows" -> 6L,
- "number of output rows" -> 4L)),
- 0L -> ("SortBasedAggregate", Map(
- "number of input rows" -> 4L,
- "number of output rows" -> 3L)))
- )
- }
- }
-
test("TungstenAggregate metrics") {
- withSQLConf(
- SQLConf.UNSAFE_ENABLED.key -> "true",
- SQLConf.CODEGEN_ENABLED.key -> "true",
- SQLConf.TUNGSTEN_ENABLED.key -> "true") {
- // Assume the execution plan is
- // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
- // -> TungstenAggregate(nodeId = 0)
- val df = testData2.groupBy().count() // 2 partitions
- testSparkPlanMetrics(df, 1, Map(
- 2L -> ("TungstenAggregate", Map(
- "number of input rows" -> 6L,
- "number of output rows" -> 2L)),
- 0L -> ("TungstenAggregate", Map(
- "number of input rows" -> 2L,
- "number of output rows" -> 1L)))
- )
+ // Assume the execution plan is
+ // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
+ // -> TungstenAggregate(nodeId = 0)
+ val df = testData2.groupBy().count() // 2 partitions
+ testSparkPlanMetrics(df, 1, Map(
+ 2L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 2L)),
+ 0L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 2L,
+ "number of output rows" -> 1L)))
+ )
- // 2 partitions and each partition contains 2 keys
- val df2 = testData2.groupBy('a).count()
- testSparkPlanMetrics(df2, 1, Map(
- 2L -> ("TungstenAggregate", Map(
- "number of input rows" -> 6L,
- "number of output rows" -> 4L)),
- 0L -> ("TungstenAggregate", Map(
- "number of input rows" -> 4L,
- "number of output rows" -> 3L)))
- )
- }
+ // 2 partitions and each partition contains 2 keys
+ val df2 = testData2.groupBy('a).count()
+ testSparkPlanMetrics(df2, 1, Map(
+ 2L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 6L,
+ "number of output rows" -> 4L)),
+ 0L -> ("TungstenAggregate", Map(
+ "number of input rows" -> 4L,
+ "number of output rows" -> 3L)))
+ )
}
test("SortMergeJoin metrics") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 22d2aefd69..61e3e913c2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -808,54 +808,12 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
}
}
-class SortBasedAggregationQuerySuite extends AggregationQuerySuite {
- var originalUnsafeEnabled: Boolean = _
+class TungstenAggregationQuerySuite extends AggregationQuerySuite
- override def beforeAll(): Unit = {
- originalUnsafeEnabled = sqlContext.conf.unsafeEnabled
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, "false")
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, originalUnsafeEnabled.toString)
- }
-}
-
-class TungstenAggregationQuerySuite extends AggregationQuerySuite {
-
- var originalUnsafeEnabled: Boolean = _
-
- override def beforeAll(): Unit = {
- originalUnsafeEnabled = sqlContext.conf.unsafeEnabled
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, "true")
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, originalUnsafeEnabled.toString)
- }
-}
class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
- var originalUnsafeEnabled: Boolean = _
-
- override def beforeAll(): Unit = {
- originalUnsafeEnabled = sqlContext.conf.unsafeEnabled
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, "true")
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- sqlContext.setConf(SQLConf.UNSAFE_ENABLED.key, originalUnsafeEnabled.toString)
- sqlContext.conf.unsetConf("spark.sql.TungstenAggregate.testFallbackStartsAt")
- }
-
override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
(0 to 2).foreach { fallbackStartsAt =>
sqlContext.setConf(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 94162da4ea..a7b7ad0093 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -37,8 +37,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
"== Parsed Logical Plan ==",
"== Analyzed Logical Plan ==",
"== Optimized Logical Plan ==",
- "== Physical Plan ==",
- "Code Generation")
+ "== Physical Plan ==")
}
test("explain create table command") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 5f9a447759..5ab477efc4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -28,11 +28,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
import org.apache.hadoop.io.Writable
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
-
import org.apache.spark.util.Utils
+
case class Fields(f1: Int, f2: Int, f3: Int, f4: Int, f5: Int)
// Case classes for the custom UDF's.
@@ -92,44 +92,36 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton {
}
test("Max/Min on named_struct") {
- def testOrderInStruct(): Unit = {
- checkAnswer(sql(
- """
- |SELECT max(named_struct(
- | "key", key,
- | "value", value)).value FROM src
- """.stripMargin), Seq(Row("val_498")))
- checkAnswer(sql(
- """
- |SELECT min(named_struct(
- | "key", key,
- | "value", value)).value FROM src
- """.stripMargin), Seq(Row("val_0")))
-
- // nested struct cases
- checkAnswer(sql(
- """
- |SELECT max(named_struct(
- | "key", named_struct(
- "key", key,
- "value", value),
- | "value", value)).value FROM src
- """.stripMargin), Seq(Row("val_498")))
- checkAnswer(sql(
- """
- |SELECT min(named_struct(
- | "key", named_struct(
- "key", key,
- "value", value),
- | "value", value)).value FROM src
- """.stripMargin), Seq(Row("val_0")))
- }
- val codegenDefault = hiveContext.getConf(SQLConf.CODEGEN_ENABLED)
- hiveContext.setConf(SQLConf.CODEGEN_ENABLED, true)
- testOrderInStruct()
- hiveContext.setConf(SQLConf.CODEGEN_ENABLED, false)
- testOrderInStruct()
- hiveContext.setConf(SQLConf.CODEGEN_ENABLED, codegenDefault)
+ checkAnswer(sql(
+ """
+ |SELECT max(named_struct(
+ | "key", key,
+ | "value", value)).value FROM src
+ """.stripMargin), Seq(Row("val_498")))
+ checkAnswer(sql(
+ """
+ |SELECT min(named_struct(
+ | "key", key,
+ | "value", value)).value FROM src
+ """.stripMargin), Seq(Row("val_0")))
+
+ // nested struct cases
+ checkAnswer(sql(
+ """
+ |SELECT max(named_struct(
+ | "key", named_struct(
+ "key", key,
+ "value", value),
+ | "value", value)).value FROM src
+ """.stripMargin), Seq(Row("val_498")))
+ checkAnswer(sql(
+ """
+ |SELECT min(named_struct(
+ | "key", named_struct(
+ "key", key,
+ "value", value),
+ | "value", value)).value FROM src
+ """.stripMargin), Seq(Row("val_0")))
}
test("SPARK-6409 UDAF Average test") {