From 4e70e8256ce2f45b438642372329eac7b1e9e8cf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 6 Aug 2015 17:30:31 -0700 Subject: [SPARK-9228] [SQL] use tungsten.enabled in public for both of codegen/unsafe spark.sql.tungsten.enabled will be the default value for both codegen and unsafe, they are kept internally for debug/testing. cc marmbrus rxin Author: Davies Liu Closes #7998 from davies/tungsten and squashes the following commits: c1c16da [Davies Liu] update doc 1a47be1 [Davies Liu] use tungsten.enabled for both of codegen/unsafe --- docs/sql-programming-guide.md | 6 +++--- .../main/scala/org/apache/spark/sql/SQLConf.scala | 20 +++++++++++++------- .../org/apache/spark/sql/execution/SparkPlan.scala | 8 +++++++- .../apache/spark/sql/execution/joins/HashJoin.scala | 3 ++- .../spark/sql/execution/joins/HashOuterJoin.scala | 2 +- .../spark/sql/execution/joins/HashSemiJoin.scala | 3 ++- 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 3ea77e8242..6c317175d3 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1884,11 +1884,11 @@ that these options will be deprecated in future release as more optimizations ar - spark.sql.codegen + spark.sql.tungsten.enabled true - When true, code will be dynamically generated at runtime for expression evaluation in a specific - query. For some queries with complicated expression this option can lead to significant speed-ups. + When true, use the optimized Tungsten physical execution backend which explicitly manages memory + and dynamically generates bytecode for expression evaluation. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index f836122b3e..ef35c133d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -223,14 +223,21 @@ private[spark] object SQLConf { defaultValue = Some(200), doc = "The default number of partitions to use when shuffling data for joins or aggregations.") - val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", + val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled", defaultValue = Some(true), + doc = "When true, use the optimized Tungsten physical execution backend which explicitly " + + "manages memory and dynamically generates bytecode for expression evaluation.") + + val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", + defaultValue = Some(true), // use TUNGSTEN_ENABLED as default doc = "When true, code will be dynamically generated at runtime for expression evaluation in" + - " a specific query.") + " a specific query.", + isPublic = false) val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled", - defaultValue = Some(true), - doc = "When true, use the new optimized Tungsten physical execution backend.") + defaultValue = Some(true), // use TUNGSTEN_ENABLED as default + doc = "When true, use the new optimized Tungsten physical execution backend.", + isPublic = false) val DIALECT = stringConf( "spark.sql.dialect", @@ -427,7 +434,6 @@ private[spark] object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ - private[sql] class SQLConf extends Serializable with CatalystConf { import SQLConf._ @@ -474,11 +480,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) - private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED) + private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED)) def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) - private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED) + private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED)) private[spark] def useSqlAggregate2: Boolean = getConf(USE_SQL_AGGREGATE2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2f29067f56..3fff79cd1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -55,12 +55,18 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext // sqlContext will be null when we are being deserialized on the slaves. In this instance - // the value of codegenEnabled will be set by the desserializer after the constructor has run. + // the value of codegenEnabled/unsafeEnabled will be set by the desserializer after the + // constructor has run. val codegenEnabled: Boolean = if (sqlContext != null) { sqlContext.conf.codegenEnabled } else { false } + val unsafeEnabled: Boolean = if (sqlContext != null) { + sqlContext.conf.unsafeEnabled + } else { + false + } /** * Whether the "prepare" method is called. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 5e9cd9fd23..22d46d1c3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -44,7 +44,8 @@ trait HashJoin { override def output: Seq[Attribute] = left.output ++ right.output protected[this] def isUnsafeMode: Boolean = { - (self.codegenEnabled && UnsafeProjection.canSupport(buildKeys) + (self.codegenEnabled && self.unsafeEnabled + && UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index 346337e642..701bd3cd86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -67,7 +67,7 @@ trait HashOuterJoin { } protected[this] def isUnsafeMode: Boolean = { - (self.codegenEnabled && joinType != FullOuter + (self.codegenEnabled && self.unsafeEnabled && joinType != FullOuter && UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala index 47a7d370f5..82dd6eb7e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala @@ -33,7 +33,8 @@ trait HashSemiJoin { override def output: Seq[Attribute] = left.output protected[this] def supportUnsafe: Boolean = { - (self.codegenEnabled && UnsafeProjection.canSupport(leftKeys) + (self.codegenEnabled && self.unsafeEnabled + && UnsafeProjection.canSupport(leftKeys) && UnsafeProjection.canSupport(rightKeys) && UnsafeProjection.canSupport(left.schema) && UnsafeProjection.canSupport(right.schema)) -- cgit v1.2.3